123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514 |
- // Copyright 2021-2023 EMQ Technologies Co., Ltd.
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package service
- import (
- "archive/zip"
- "encoding/json"
- "fmt"
- "os"
- "path"
- "path/filepath"
- "strings"
- "sync"
- kconf "github.com/lf-edge/ekuiper/internal/conf"
- "github.com/lf-edge/ekuiper/internal/pkg/filex"
- "github.com/lf-edge/ekuiper/internal/pkg/httpx"
- "github.com/lf-edge/ekuiper/internal/pkg/store"
- "github.com/lf-edge/ekuiper/internal/plugin"
- "github.com/lf-edge/ekuiper/pkg/api"
- "github.com/lf-edge/ekuiper/pkg/kv"
- )
- var (
- once sync.Once
- mutex sync.Mutex
- singleton *Manager // Do not call this directly, use GetServiceManager
- )
- type Manager struct {
- executorPool *sync.Map // The pool of executors
- loaded bool
- serviceBuf *sync.Map
- functionBuf *sync.Map
- etcDir string
- serviceInstallKV kv.KeyValue
- serviceStatusInstallKV kv.KeyValue
- serviceKV kv.KeyValue
- functionKV kv.KeyValue
- }
- func InitManager() (*Manager, error) {
- mutex.Lock()
- defer mutex.Unlock()
- if singleton == nil {
- dir := "data/services"
- if kconf.IsTesting {
- dir = "service/test"
- }
- etcDir, err := kconf.GetLoc(dir)
- if err != nil {
- return nil, fmt.Errorf("cannot find etc/services folder: %s", err)
- }
- sdb, err := store.GetKV("services")
- if err != nil {
- return nil, fmt.Errorf("cannot open service db: %s", err)
- }
- fdb, err := store.GetKV("serviceFuncs")
- if err != nil {
- return nil, fmt.Errorf("cannot open function db: %s", err)
- }
- sInstallDb, err := store.GetKV("serviceInstall")
- if err != nil {
- return nil, fmt.Errorf("cannot open service db: %s", err)
- }
- statusDb, err := store.GetKV("serviceInstallStatus")
- if err != nil {
- return nil, fmt.Errorf("cannot open service db: %s", err)
- }
- singleton = &Manager{
- executorPool: &sync.Map{},
- serviceBuf: &sync.Map{},
- functionBuf: &sync.Map{},
- etcDir: etcDir,
- serviceStatusInstallKV: statusDb,
- serviceInstallKV: sInstallDb,
- serviceKV: sdb,
- functionKV: fdb,
- }
- }
- if !singleton.loaded && !kconf.IsTesting { // To boost the testing perf
- err := singleton.InitByFiles()
- return singleton, err
- }
- return singleton, nil
- }
- func GetManager() *Manager {
- return singleton
- }
- // InitByFiles
- /**
- * This function will parse the service definition json files in etc/services.
- * It will validate all json files and their schemaFiles. If invalid, it just prints
- * an error log and ignore. So it is possible that only valid service definition are
- * parsed and available.
- *
- * NOT threadsafe, must run in lock
- */
- func (m *Manager) InitByFiles() error {
- kconf.Log.Debugf("init service manager")
- files, err := os.ReadDir(m.etcDir)
- if nil != err {
- return err
- }
- // Parse schemas in batch. So we have 2 loops. First loop to collect files and the second to save the result.
- for _, file := range files {
- baseName := filepath.Base(file.Name())
- if filepath.Ext(baseName) == ".json" {
- err := m.initFile(baseName)
- if err != nil {
- kconf.Log.Errorf("%v", err)
- continue
- }
- }
- }
- m.loaded = true
- return nil
- }
- func (m *Manager) initFile(baseName string) error {
- serviceConf := &conf{}
- err := filex.ReadJsonUnmarshal(filepath.Join(m.etcDir, baseName), serviceConf)
- if err != nil {
- return fmt.Errorf("parse services file %s failed: %v", baseName, err)
- }
- // TODO validate serviceConf
- serviceName := baseName[0 : len(baseName)-5]
- info := &serviceInfo{
- About: serviceConf.About,
- Interfaces: make(map[string]*interfaceInfo),
- }
- for name, binding := range serviceConf.Interfaces {
- desc, err := parse(binding.SchemaType, binding.SchemaFile)
- if err != nil {
- return fmt.Errorf("Fail to parse schema file %s: %v", binding.SchemaFile, err)
- }
- // setting function alias
- aliasMap := make(map[string]string)
- for _, finfo := range binding.Functions {
- aliasMap[finfo.ServiceName] = finfo.Name
- }
- methods := desc.GetFunctions()
- functions := make([]string, len(methods))
- for i, f := range methods {
- fname := f
- if a, ok := aliasMap[f]; ok {
- fname = a
- }
- functions[i] = fname
- }
- info.Interfaces[name] = &interfaceInfo{
- Desc: binding.Description,
- Addr: binding.Address,
- Protocol: binding.Protocol,
- Schema: &schemaInfo{
- SchemaType: binding.SchemaType,
- SchemaFile: binding.SchemaFile,
- },
- Functions: functions,
- Options: binding.Options,
- }
- for i, f := range functions {
- err := m.functionKV.Set(f, &functionContainer{
- ServiceName: serviceName,
- InterfaceName: name,
- MethodName: methods[i],
- })
- if err != nil {
- kconf.Log.Errorf("fail to save the function mapping for %s, the function is not available: %v", f, err)
- }
- }
- }
- err = m.serviceKV.Set(serviceName, info)
- if err != nil {
- return fmt.Errorf("fail to save the parsing result: %v", err)
- }
- return nil
- }
- // Start Implement FunctionFactory
- func (m *Manager) HasFunctionSet(_ string) bool {
- return false
- }
- func (m *Manager) FunctionPluginInfo(funcName string) (plugin.EXTENSION_TYPE, string, string) {
- funcContainer, ok := m.getFunction(funcName)
- if ok {
- installScript := ""
- m.serviceInstallKV.Get(funcContainer.ServiceName, &installScript)
- return plugin.SERVICE_EXTENSION, funcContainer.ServiceName, installScript
- } else {
- return plugin.NONE_EXTENSION, "", ""
- }
- }
- func (m *Manager) Function(name string) (api.Function, error) {
- f, ok := m.getFunction(name)
- if !ok {
- return nil, fmt.Errorf("service function %s not found", name)
- }
- s, ok := m.getService(f.ServiceName)
- if !ok {
- return nil, fmt.Errorf("service function %s's service %s not found", name, f.ServiceName)
- }
- i, ok := s.Interfaces[f.InterfaceName]
- if !ok {
- return nil, fmt.Errorf("service function %s's interface %s not found", name, f.InterfaceName)
- }
- // executor is gotten from pool, so all externalFuncs with the same interface share the same executor instance
- e, err := m.getExecutor(f.InterfaceName, i)
- if err != nil {
- return nil, fmt.Errorf("fail to initiate the executor for %s: %v", f.InterfaceName, err)
- }
- return &ExternalFunc{exe: e, methodName: f.MethodName}, nil
- }
- func (m *Manager) ConvName(funcName string) (string, bool) {
- _, ok := m.getFunction(funcName)
- return funcName, ok
- }
- // End Implement FunctionFactory
- func (m *Manager) HasService(name string) bool {
- _, ok := m.getService(name)
- kconf.Log.Debugf("found external service %s? %v ", name, ok)
- return ok
- }
- func (m *Manager) getFunction(name string) (*functionContainer, bool) {
- var r *functionContainer
- if t, ok := m.functionBuf.Load(name); ok {
- r = t.(*functionContainer)
- return r, ok
- } else {
- r = &functionContainer{}
- ok, err := m.functionKV.Get(name, r)
- if err != nil {
- kconf.Log.Errorf("failed to get service function %s from kv: %v", name, err)
- return nil, false
- }
- if ok {
- m.functionBuf.Store(name, r)
- }
- return r, ok
- }
- }
- func (m *Manager) getService(name string) (*serviceInfo, bool) {
- var r *serviceInfo
- if t, ok := m.serviceBuf.Load(name); ok {
- r = t.(*serviceInfo)
- return r, ok
- } else {
- r = &serviceInfo{}
- ok, err := m.serviceKV.Get(name, r)
- if err != nil {
- kconf.Log.Errorf("failed to get service %s from kv: %v", name, err)
- return nil, false
- }
- if ok {
- m.serviceBuf.Store(name, r)
- }
- return r, ok
- }
- }
- // Each interface maps to an executor
- func (m *Manager) getExecutor(name string, info *interfaceInfo) (executor, error) {
- e, ok := m.executorPool.Load(name)
- if !ok {
- ne, err := NewExecutor(info)
- if err != nil {
- return nil, err
- }
- e, _ = m.executorPool.LoadOrStore(name, ne)
- }
- return e.(executor), nil
- }
- func (m *Manager) deleteServiceFuncs(service string) error {
- if s, ok := m.getService(service); ok {
- for _, i := range s.Interfaces {
- for _, f := range i.Functions {
- _ = m.deleteFunc(service, f)
- }
- }
- }
- return nil
- }
- func (m *Manager) deleteFunc(service, name string) error {
- f, err := m.GetFunction(name)
- if err != nil {
- return err
- }
- if f.ServiceName == service {
- m.functionBuf.Delete(name)
- m.functionKV.Delete(name)
- }
- return nil
- }
- // ** CRUD of the service files **
- type ServiceCreationRequest struct {
- Name string `json:"name"`
- File string `json:"file"`
- }
- func (s *ServiceCreationRequest) InstallScript() string {
- marshal, err := json.Marshal(s)
- if err != nil {
- return ""
- }
- return string(marshal)
- }
- func (m *Manager) List() ([]string, error) {
- return m.serviceKV.Keys()
- }
- func (m *Manager) Create(r *ServiceCreationRequest) error {
- name, uri := r.Name, r.File
- if ok, _ := m.serviceKV.Get(name, &serviceInfo{}); ok {
- return fmt.Errorf("service %s exist", name)
- }
- if !httpx.IsValidUrl(uri) {
- return fmt.Errorf("invalid file path %s", uri)
- }
- zipPath := path.Join(m.etcDir, name+".zip")
- // clean up: delete zip file and unzip files in error
- defer os.Remove(zipPath)
- // download
- err := httpx.DownloadFile(zipPath, uri)
- if err != nil {
- return fmt.Errorf("fail to download file %s: %s", uri, err)
- }
- // unzip and copy to destination
- err = m.unzip(name, zipPath)
- if err != nil {
- return err
- }
- // save the install script
- m.serviceInstallKV.Set(name, r.InstallScript())
- // init file to serviceKV
- return m.initFile(name + ".json")
- }
- func (m *Manager) Delete(name string) error {
- name = strings.Trim(name, " ")
- if name == "" {
- return fmt.Errorf("invalid name %s: should not be empty", name)
- }
- m.deleteServiceFuncs(name)
- m.serviceBuf.Delete(name)
- err := m.serviceKV.Delete(name)
- if err != nil {
- return err
- }
- _ = m.serviceInstallKV.Delete(name)
- path := path.Join(m.etcDir, name+".json")
- err = os.Remove(path)
- if err != nil {
- kconf.Log.Errorf("remove service json fails: %v", err)
- }
- return nil
- }
- func (m *Manager) Get(name string) (*serviceInfo, error) {
- name = strings.Trim(name, " ")
- if name == "" {
- return nil, fmt.Errorf("invalid name %s: should not be empty", name)
- }
- r, ok := m.getService(name)
- if !ok {
- return nil, fmt.Errorf("can't get the service %s", name)
- }
- return r, nil
- }
- func (m *Manager) Update(req *ServiceCreationRequest) error {
- err := m.Delete(req.Name)
- if err != nil {
- return err
- }
- return m.Create(req)
- }
- func (m *Manager) unzip(name, src string) error {
- r, err := zip.OpenReader(src)
- if err != nil {
- return err
- }
- defer r.Close()
- baseName := name + ".json"
- // Try unzip
- found := false
- for _, file := range r.File {
- if strings.EqualFold(file.Name, baseName) {
- found = true
- break
- }
- }
- if !found {
- return fmt.Errorf("cannot find the json descriptor file %s for service", baseName)
- }
- // unzip
- for _, file := range r.File {
- err := filex.UnzipTo(file, path.Join(m.etcDir, file.Name))
- if err != nil {
- return err
- }
- }
- return nil
- }
- func (m *Manager) ListFunctions() ([]string, error) {
- return m.functionKV.Keys()
- }
- func (m *Manager) GetFunction(name string) (*functionContainer, error) {
- name = strings.Trim(name, " ")
- if name == "" {
- return nil, fmt.Errorf("invalid name %s: should not be empty", name)
- }
- r, ok := m.getFunction(name)
- if !ok {
- return nil, fmt.Errorf("can't get the service function %s", name)
- }
- return r, nil
- }
- func (m *Manager) GetAllServices() map[string]string {
- all, err := m.serviceInstallKV.All()
- if err != nil {
- return nil
- }
- return all
- }
- func (m *Manager) GetAllServicesStatus() map[string]string {
- all, err := m.serviceStatusInstallKV.All()
- if err != nil {
- return nil
- }
- return all
- }
- func (m *Manager) UninstallAllServices() {
- keys, err := m.serviceInstallKV.Keys()
- if err != nil {
- return
- }
- for _, v := range keys {
- _ = m.Delete(v)
- }
- }
- func (m *Manager) servicesRegisterForImport(_, v string) error {
- req := &ServiceCreationRequest{}
- err := json.Unmarshal([]byte(v), req)
- if err != nil {
- return err
- }
- err = m.Create(req)
- if err != nil {
- return err
- }
- return nil
- }
- func (m *Manager) ImportServices(services map[string]string) map[string]string {
- errMap := map[string]string{}
- _ = m.serviceStatusInstallKV.Clean()
- for k, v := range services {
- err := m.servicesRegisterForImport(k, v)
- if err != nil {
- _ = m.serviceStatusInstallKV.Set(k, err.Error())
- errMap[k] = err.Error()
- }
- }
- return errMap
- }
- func (m *Manager) ImportPartialServices(services map[string]string) map[string]string {
- errMap := map[string]string{}
- for k, v := range services {
- err := m.servicesRegisterForImport(k, v)
- if err != nil {
- errMap[k] = err.Error()
- }
- }
- return errMap
- }
|