123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176 |
- // Copyright 2023 EMQ Technologies Co., Ltd.
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package util
- import (
- "database/sql"
- "strings"
- "sync"
- "github.com/lf-edge/ekuiper/internal/conf"
- "github.com/xo/dburl"
- )
- var GlobalPool *driverPool
- func init() {
- // GlobalPool maintained the *sql.DB group by the driver and DSN.
- // Multiple sql sources/sinks can directly fetch the `*sql.DB` from the GlobalPool and return it back when they don't need it anymore.
- // As multiple sql sources/sinks share the same `*sql.DB`, we can directly control the total count of connections by using `SetMaxOpenConns`
- GlobalPool = newDriverPool()
- }
- type driverPool struct {
- isTesting bool
- sync.RWMutex
- pool map[string]*dbPool
- }
- func newDriverPool() *driverPool {
- return &driverPool{
- pool: map[string]*dbPool{},
- }
- }
- type dbPool struct {
- isTesting bool
- driver string
- sync.RWMutex
- pool map[string]*sql.DB
- connections map[string]int
- }
- func (dp *dbPool) getDBConnCount(dsn string) int {
- dp.RLock()
- defer dp.RUnlock()
- count, ok := dp.connections[dsn]
- if ok {
- return count
- }
- return 0
- }
- func (dp *dbPool) getOrCreate(dsn string) (*sql.DB, error) {
- dp.Lock()
- defer dp.Unlock()
- db, ok := dp.pool[dsn]
- if ok {
- dp.connections[dsn] = dp.connections[dsn] + 1
- return db, nil
- }
- newDb, err := openDB(dp.driver, dsn, dp.isTesting)
- if err != nil {
- return nil, err
- }
- conf.Log.Debugf("create new database instance: %v", dsn)
- dp.pool[dsn] = newDb
- dp.connections[dsn] = 1
- return newDb, nil
- }
- func openDB(driver, dsn string, isTesting bool) (*sql.DB, error) {
- if isTesting {
- return nil, nil
- }
- db, err := sql.Open(driver, dsn)
- if err != nil {
- return nil, err
- }
- c := conf.Config
- if c != nil && c.Basic.SQLConf != nil && c.Basic.SQLConf.MaxConnections > 0 {
- db.SetMaxOpenConns(c.Basic.SQLConf.MaxConnections)
- }
- return db, nil
- }
- func (dp *dbPool) closeOneConn(dsn string) error {
- dp.Lock()
- defer dp.Unlock()
- connCount, ok := dp.connections[dsn]
- if !ok {
- return nil
- }
- connCount--
- if connCount > 0 {
- dp.connections[dsn] = connCount
- return nil
- }
- conf.Log.Debugf("drop database instance: %v", dsn)
- db := dp.pool[dsn]
- // remove db instance from map in order to avoid memory leak
- delete(dp.pool, dsn)
- delete(dp.connections, dsn)
- if dp.isTesting {
- return nil
- }
- return db.Close()
- }
- func (dp *driverPool) getOrCreate(driver string) *dbPool {
- dp.Lock()
- defer dp.Unlock()
- db, ok := dp.pool[driver]
- if ok {
- return db
- }
- newDB := &dbPool{
- isTesting: dp.isTesting,
- driver: driver,
- pool: map[string]*sql.DB{},
- connections: map[string]int{},
- }
- dp.pool[driver] = newDB
- return newDB
- }
- func (dp *driverPool) get(driver string) (*dbPool, bool) {
- dp.RLock()
- defer dp.RUnlock()
- dbPool, ok := dp.pool[driver]
- return dbPool, ok
- }
- func ParseDBUrl(urlstr string) (string, string, error) {
- u, err := dburl.Parse(urlstr)
- if err != nil {
- return "", "", err
- }
- // Open returns *sql.DB from urlstr
- // As we use modernc.org/sqlite with `sqlite` as driver name and dburl use `sqlite3` as driver name, we need to fix it before open sql.DB
- if strings.ToLower(u.Driver) == "sqlite3" {
- u.Driver = "sqlite"
- }
- return u.Driver, u.DSN, nil
- }
- func FetchDBToOneNode(driverPool *driverPool, driver, dsn string) (*sql.DB, error) {
- dbPool := driverPool.getOrCreate(driver)
- return dbPool.getOrCreate(dsn)
- }
- func ReturnDBFromOneNode(driverPool *driverPool, driver, dsn string) error {
- dbPool, ok := driverPool.get(driver)
- if !ok {
- return nil
- }
- return dbPool.closeOneConn(dsn)
- }
- func getDBConnCount(driverPool *driverPool, driver, dsn string) int {
- dbPool := driverPool.getOrCreate(driver)
- return dbPool.getDBConnCount(dsn)
- }
|