123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081 |
- package connection
- import (
- "fmt"
- "github.com/lf-edge/ekuiper/internal/conf"
- "sync"
- )
- var m = clientManager{
- clientFactory: make(map[string]ClientFactoryFunc),
- lock: sync.Mutex{},
- clientMap: make(map[string]*clientWrapper),
- }
- type clientManager struct {
- lock sync.Mutex
- clientFactory map[string]ClientFactoryFunc
- clientMap map[string]*clientWrapper
- }
- func registerClientFactory(clientType string, creatorFunc ClientFactoryFunc) {
- m.lock.Lock()
- m.clientFactory[clientType] = creatorFunc
- m.lock.Unlock()
- }
- func GetConnection(connectSelector string) (interface{}, error) {
- m.lock.Lock()
- defer m.lock.Unlock()
- var cliWpr *clientWrapper
- var found bool
- cliWpr, found = m.clientMap[connectSelector]
- if found {
- cliWpr.addRef()
- } else {
- selectCfg := &ConSelector{
- ConnSelectorCfg: connectSelector,
- }
- err := selectCfg.Init()
- if err != nil {
- conf.Log.Errorf("connection selector: %s have error %s.", connectSelector, err)
- return nil, err
- }
- clientCreator, ok := m.clientFactory[selectCfg.Type]
- if !ok {
- conf.Log.Errorf("can not find clientCreator for connection selector : %s only support %s", connectSelector, selectCfg.SupportedType)
- return nil, fmt.Errorf("can not find clientCreator for connection selector : %s. only support %s", connectSelector, selectCfg.SupportedType)
- }
- client := clientCreator(selectCfg)
- cliWpr, err = NewClientWrapper(client, selectCfg)
- if err != nil {
- conf.Log.Errorf("can not create client for connection selector : %s have error %s", connectSelector, err)
- return nil, err
- }
- m.clientMap[connectSelector] = cliWpr
- }
- conf.Log.Infof("connection selector: %s GetConnection count %d.", connectSelector, cliWpr.refCnt)
- return cliWpr.getInstance(), nil
- }
- func ReleaseConnection(connectSelector string) {
- m.lock.Lock()
- defer m.lock.Unlock()
- if v, ok := m.clientMap[connectSelector]; ok {
- v.subRef()
- conf.Log.Infof("connection selector: %s ReleaseConnection count %d.", connectSelector, v.refCnt)
- if v.IsRefEmpty() {
- v.clean()
- delete(m.clientMap, connectSelector)
- }
- }
- }
|