client_wrapper.go 1.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. package connection
  2. import (
  3. "github.com/lf-edge/ekuiper/internal/conf"
  4. )
  5. type ClientFactoryFunc func(super *conf.ConSelector) Client
  6. type Client interface {
  7. CfgValidate(map[string]interface{}) error
  8. GetClient() (interface{}, error)
  9. CloseClient() error
  10. }
  11. type clientWrapper struct {
  12. cli Client
  13. conn interface{}
  14. refCnt uint32
  15. }
  16. func NewClientWrapper(client Client, selector *conf.ConSelector) (*clientWrapper, error) {
  17. props, err := selector.ReadCfgFromYaml()
  18. if err != nil {
  19. return nil, err
  20. }
  21. err = client.CfgValidate(props)
  22. if err != nil {
  23. return nil, err
  24. }
  25. var con interface{}
  26. con, err = client.GetClient()
  27. if err != nil {
  28. return nil, err
  29. }
  30. cliWpr := &clientWrapper{
  31. cli: client,
  32. conn: con,
  33. refCnt: 1,
  34. }
  35. return cliWpr, nil
  36. }
  37. func (c *clientWrapper) addRef() {
  38. c.refCnt = c.refCnt + 1
  39. }
  40. func (c *clientWrapper) subRef() {
  41. c.refCnt = c.refCnt - 1
  42. }
  43. func (c *clientWrapper) IsRefEmpty() bool {
  44. return c.refCnt == 0
  45. }
  46. func (c *clientWrapper) clean() {
  47. _ = c.cli.CloseClient()
  48. }
  49. func (c *clientWrapper) getInstance() interface{} {
  50. return c.conn
  51. }