pool.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. // Copyright 2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package util
  15. import (
  16. "database/sql"
  17. "strings"
  18. "sync"
  19. "github.com/lf-edge/ekuiper/internal/conf"
  20. "github.com/xo/dburl"
  21. )
  22. var GlobalPool *driverPool
  23. func init() {
  24. // GlobalPool maintained the *sql.DB group by the driver and DSN.
  25. // Multiple sql sources/sinks can directly fetch the `*sql.DB` from the GlobalPool and return it back when they don't need it anymore.
  26. // As multiple sql sources/sinks share the same `*sql.DB`, we can directly control the total count of connections by using `SetMaxOpenConns`
  27. GlobalPool = newDriverPool()
  28. }
  29. type driverPool struct {
  30. isTesting bool
  31. sync.RWMutex
  32. pool map[string]*dbPool
  33. }
  34. func newDriverPool() *driverPool {
  35. return &driverPool{
  36. pool: map[string]*dbPool{},
  37. }
  38. }
  39. type dbPool struct {
  40. isTesting bool
  41. driver string
  42. sync.RWMutex
  43. pool map[string]*sql.DB
  44. connections map[string]int
  45. }
  46. func (dp *dbPool) getDBConnCount(dsn string) int {
  47. dp.RLock()
  48. defer dp.RUnlock()
  49. count, ok := dp.connections[dsn]
  50. if ok {
  51. return count
  52. }
  53. return 0
  54. }
  55. func (dp *dbPool) getOrCreate(dsn string) (*sql.DB, error) {
  56. dp.Lock()
  57. defer dp.Unlock()
  58. db, ok := dp.pool[dsn]
  59. if ok {
  60. dp.connections[dsn] = dp.connections[dsn] + 1
  61. return db, nil
  62. }
  63. newDb, err := openDB(dp.driver, dsn, dp.isTesting)
  64. if err != nil {
  65. return nil, err
  66. }
  67. conf.Log.Debugf("create new database instance: %v", dsn)
  68. dp.pool[dsn] = newDb
  69. dp.connections[dsn] = 1
  70. return newDb, nil
  71. }
  72. func openDB(driver, dsn string, isTesting bool) (*sql.DB, error) {
  73. if isTesting {
  74. return nil, nil
  75. }
  76. db, err := sql.Open(driver, dsn)
  77. if err != nil {
  78. return nil, err
  79. }
  80. c := conf.Config
  81. if c != nil && c.Basic.SQLConf != nil && c.Basic.SQLConf.MaxConnections > 0 {
  82. db.SetMaxOpenConns(c.Basic.SQLConf.MaxConnections)
  83. }
  84. return db, nil
  85. }
  86. func (dp *dbPool) closeOneConn(dsn string) error {
  87. dp.Lock()
  88. defer dp.Unlock()
  89. connCount, ok := dp.connections[dsn]
  90. if !ok {
  91. return nil
  92. }
  93. connCount--
  94. if connCount > 0 {
  95. dp.connections[dsn] = connCount
  96. return nil
  97. }
  98. conf.Log.Debugf("drop database instance: %v", dsn)
  99. db := dp.pool[dsn]
  100. // remove db instance from map in order to avoid memory leak
  101. delete(dp.pool, dsn)
  102. delete(dp.connections, dsn)
  103. if dp.isTesting {
  104. return nil
  105. }
  106. return db.Close()
  107. }
  108. func (dp *driverPool) getOrCreate(driver string) *dbPool {
  109. dp.Lock()
  110. defer dp.Unlock()
  111. db, ok := dp.pool[driver]
  112. if ok {
  113. return db
  114. }
  115. newDB := &dbPool{
  116. isTesting: dp.isTesting,
  117. driver: driver,
  118. pool: map[string]*sql.DB{},
  119. connections: map[string]int{},
  120. }
  121. dp.pool[driver] = newDB
  122. return newDB
  123. }
  124. func (dp *driverPool) get(driver string) (*dbPool, bool) {
  125. dp.RLock()
  126. defer dp.RUnlock()
  127. dbPool, ok := dp.pool[driver]
  128. return dbPool, ok
  129. }
  130. func ParseDBUrl(urlstr string) (string, string, error) {
  131. u, err := dburl.Parse(urlstr)
  132. if err != nil {
  133. return "", "", err
  134. }
  135. // Open returns *sql.DB from urlstr
  136. // As we use modernc.org/sqlite with `sqlite` as driver name and dburl use `sqlite3` as driver name, we need to fix it before open sql.DB
  137. if strings.ToLower(u.Driver) == "sqlite3" {
  138. u.Driver = "sqlite"
  139. }
  140. return u.Driver, u.DSN, nil
  141. }
  142. func FetchDBToOneNode(driverPool *driverPool, driver, dsn string) (*sql.DB, error) {
  143. dbPool := driverPool.getOrCreate(driver)
  144. return dbPool.getOrCreate(dsn)
  145. }
  146. func ReturnDBFromOneNode(driverPool *driverPool, driver, dsn string) error {
  147. dbPool, ok := driverPool.get(driver)
  148. if !ok {
  149. return nil
  150. }
  151. return dbPool.closeOneConn(dsn)
  152. }
  153. func getDBConnCount(driverPool *driverPool, driver, dsn string) int {
  154. dbPool := driverPool.getOrCreate(driver)
  155. return dbPool.getDBConnCount(dsn)
  156. }