manager.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. package connection
  2. import (
  3. "fmt"
  4. "github.com/lf-edge/ekuiper/internal/conf"
  5. "sync"
  6. )
  7. var m = clientManager{
  8. clientFactory: make(map[string]ClientFactoryFunc),
  9. lock: sync.Mutex{},
  10. clientMap: make(map[string]*clientWrapper),
  11. }
  12. type clientManager struct {
  13. lock sync.Mutex
  14. clientFactory map[string]ClientFactoryFunc
  15. clientMap map[string]*clientWrapper
  16. }
  17. func registerClientFactory(clientType string, creatorFunc ClientFactoryFunc) {
  18. m.lock.Lock()
  19. m.clientFactory[clientType] = creatorFunc
  20. m.lock.Unlock()
  21. }
  22. func GetConnection(connectSelector string) (interface{}, error) {
  23. m.lock.Lock()
  24. defer m.lock.Unlock()
  25. var cliWpr *clientWrapper
  26. var found bool
  27. cliWpr, found = m.clientMap[connectSelector]
  28. if found {
  29. cliWpr.addRef()
  30. } else {
  31. selectCfg := &ConSelector{
  32. ConnSelectorCfg: connectSelector,
  33. }
  34. err := selectCfg.Init()
  35. if err != nil {
  36. conf.Log.Errorf("connection selector: %s have error %s.", connectSelector, err)
  37. return nil, err
  38. }
  39. clientCreator, ok := m.clientFactory[selectCfg.Type]
  40. if !ok {
  41. conf.Log.Errorf("can not find clientCreator for connection selector : %s only support %s", connectSelector, selectCfg.SupportedType)
  42. return nil, fmt.Errorf("can not find clientCreator for connection selector : %s. only support %s", connectSelector, selectCfg.SupportedType)
  43. }
  44. client := clientCreator(selectCfg)
  45. cliWpr, err = NewClientWrapper(client, selectCfg)
  46. if err != nil {
  47. conf.Log.Errorf("can not create client for connection selector : %s have error %s", connectSelector, err)
  48. return nil, err
  49. }
  50. m.clientMap[connectSelector] = cliWpr
  51. }
  52. conf.Log.Infof("connection selector: %s GetConnection count %d.", connectSelector, cliWpr.refCnt)
  53. return cliWpr.getInstance(), nil
  54. }
  55. func ReleaseConnection(connectSelector string) {
  56. m.lock.Lock()
  57. defer m.lock.Unlock()
  58. if v, ok := m.clientMap[connectSelector]; ok {
  59. v.subRef()
  60. conf.Log.Infof("connection selector: %s ReleaseConnection count %d.", connectSelector, v.refCnt)
  61. if v.IsRefEmpty() {
  62. v.clean()
  63. delete(m.clientMap, connectSelector)
  64. }
  65. }
  66. }