pool.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. // Copyright 2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package util
  15. import (
  16. "database/sql"
  17. "strings"
  18. "sync"
  19. "github.com/lf-edge/ekuiper/internal/conf"
  20. "github.com/xo/dburl"
  21. )
  22. var GlobalPool *dbPool
  23. func init() {
  24. // GlobalPool maintained the *sql.DB group by the driver and DSN.
  25. // Multiple sql sources/sinks can directly fetch the `*sql.DB` from the GlobalPool and return it back when they don't need it anymore.
  26. // As multiple sql sources/sinks share the same `*sql.DB`, we can directly control the total count of connections by using `SetMaxOpenConns`
  27. GlobalPool = newDBPool()
  28. }
  29. type dbPool struct {
  30. isTesting bool
  31. sync.RWMutex
  32. // url -> *sql.DB
  33. pool map[string]*sql.DB
  34. // url -> connection count
  35. connections map[string]int
  36. }
  37. func newDBPool() *dbPool {
  38. return &dbPool{
  39. pool: map[string]*sql.DB{},
  40. connections: map[string]int{},
  41. }
  42. }
  43. func (dp *dbPool) getDBConnCount(url string) int {
  44. dp.RLock()
  45. defer dp.RUnlock()
  46. count, ok := dp.connections[url]
  47. if ok {
  48. return count
  49. }
  50. return 0
  51. }
  52. func (dp *dbPool) getOrCreate(url string) (*sql.DB, error) {
  53. dp.Lock()
  54. defer dp.Unlock()
  55. db, ok := dp.pool[url]
  56. if ok {
  57. dp.connections[url] = dp.connections[url] + 1
  58. return db, nil
  59. }
  60. newDb, err := openDB(url, dp.isTesting)
  61. if err != nil {
  62. return nil, err
  63. }
  64. conf.Log.Debugf("create new database instance: %v", url)
  65. dp.pool[url] = newDb
  66. dp.connections[url] = 1
  67. return newDb, nil
  68. }
  69. func openDB(url string, isTesting bool) (*sql.DB, error) {
  70. if isTesting {
  71. return nil, nil
  72. }
  73. driver, dsn, err := ParseDBUrl(url)
  74. if err != nil {
  75. return nil, err
  76. }
  77. db, err := sql.Open(driver, dsn)
  78. if err != nil {
  79. return nil, err
  80. }
  81. c := conf.Config
  82. if c != nil && c.Basic.SQLConf != nil && c.Basic.SQLConf.MaxConnections > 0 {
  83. db.SetMaxOpenConns(c.Basic.SQLConf.MaxConnections)
  84. }
  85. return db, nil
  86. }
  87. func (dp *dbPool) closeOneConn(url string) error {
  88. dp.Lock()
  89. defer dp.Unlock()
  90. connCount, ok := dp.connections[url]
  91. if !ok {
  92. return nil
  93. }
  94. connCount--
  95. if connCount > 0 {
  96. dp.connections[url] = connCount
  97. return nil
  98. }
  99. conf.Log.Debugf("drop database instance: %v", url)
  100. db := dp.pool[url]
  101. // remove db instance from map in order to avoid memory leak
  102. delete(dp.pool, url)
  103. delete(dp.connections, url)
  104. if dp.isTesting {
  105. return nil
  106. }
  107. return db.Close()
  108. }
  109. func ParseDBUrl(urlstr string) (string, string, error) {
  110. u, err := dburl.Parse(urlstr)
  111. if err != nil {
  112. return "", "", err
  113. }
  114. // Open returns *sql.DB from urlstr
  115. // As we use modernc.org/sqlite with `sqlite` as driver name and dburl use `sqlite3` as driver name, we need to fix it before open sql.DB
  116. if strings.ToLower(u.Driver) == "sqlite3" {
  117. u.Driver = "sqlite"
  118. }
  119. return u.Driver, u.DSN, nil
  120. }
  121. func FetchDBToOneNode(pool *dbPool, url string) (*sql.DB, error) {
  122. return pool.getOrCreate(url)
  123. }
  124. func ReturnDBFromOneNode(pool *dbPool, url string) error {
  125. return pool.closeOneConn(url)
  126. }
  127. func getDBConnCount(pool *dbPool, url string) int {
  128. return pool.getDBConnCount(url)
  129. }