如何設計並實現一個db鏈接池?

鏈接池的使命!

不管是線程池仍是db鏈接池,他們都有一個共同的特性:資源複用,在普通的場景中,咱們使用一個鏈接,它的生命週期多是這樣的: java

一個鏈接,從建立完畢到銷燬,期間只被使用了一次(這裏的一次是指在單個做用域內的使用),當週期結束,另一個調用者仍然須要這個鏈接去作事,就要重複去經歷這種生命週期。由於建立和銷燬都是須要對應的服務消耗時間以及系統資源去處理的,這樣不只浪費了大量的系統資源,並且致使業務響應過程當中都要花費部分時間去重複的建立和銷燬,得不償失,而鏈接池便被賦予瞭解決這種問題的使命!

鏈接池須要作什麼?

顧名思義,鏈接池中的字已經很生動形象的闡明瞭它的用意,它用將全部鏈接放入一個"池子"中統一的去控制鏈接的建立和銷燬,和原始生命週期去對比,鏈接池多瞭如下特性:mysql

  • 建立並非真的建立,而是從池子中選出空閒鏈接。
  • 銷燬並非真的銷燬,而是將使用中的鏈接放回池中(邏輯關閉)。
  • 真正的建立和銷燬由線程池的特性機制來決定。

所以,當使用鏈接池後,咱們使用一個鏈接的生命週期將會演變成這樣: git

分析計劃

通靈之術 - 傳送門:github.com/ainilili/ho…,DEMO爲Java語言實現!github

事前,咱們須要點支菸分析一下時間一個鏈接池須要作哪些事情:sql

  • 保存鏈接的容器是必不可少的,另外,該容器也要支持鏈接的添加和移除功能,並保證線程安全。
  • 咱們須要由於要對鏈接的銷燬作邏輯調整,咱們須要重寫它的close以及isClosed方法。
  • 咱們須要有個入口對鏈接池作管理,例如回收空閒鏈接。

鏈接池不只僅只是對Connection生命週期的控制,還應該加入一些特點,例如初始鏈接數,最大鏈接數,最小鏈接數、最大空閒時長以及獲取鏈接的等待時長,這些咱們也簡單支持一下。shell

目標以明確,開始動工。安全

鏈接池容器選型

要保證線程安全,咱們能夠將目標瞄準在JUC包下的神通們,設咱們想要的容器爲x,那麼x不只須要知足基本的增刪改查功能,並且也要提供獲取超時功能,這是爲了保證當池內長時間沒有空閒鏈接時不會致使業務阻塞,即刻熔斷。另外,x須要知足雙向操做,這是爲了鏈接池能夠識別出飽和的空閒鏈接,方便回收操做。app

綜上所述,LinkedBlockingDeque是最合適的選擇,它使用InterruptibleReentrantLock來保證線程安全,使用Condition來作獲取元素的阻塞,另外支持雙向操做。ide

另外,咱們能夠將鏈接池拆分爲3個類型:單元測試

  • 工做池:存放正在被使用的鏈接。
  • 空閒池:存放空閒鏈接。
  • 回收池:已經被回收(物理關閉)的鏈接。

其中,工做池回收池大可沒必要用雙向對列,或許用單向隊列或者Set均可以代替之:

private LinkedBlockingQueue<HoneycombConnection> workQueue;
private LinkedBlockingDeque<HoneycombConnection> idleQueue;
private LinkedBlockingQueue<HoneycombConnection> freezeQueue;
複製代碼

Connection的裝飾

鏈接池的輸出是Connection,它表明着一個db鏈接,上游服務使用它作完操做後,會直接調用它的close方法來釋放鏈接,而咱們必須作的是在調用者無感知的狀況下改變它的關閉邏輯,當調用close的方法時,咱們將它放回空閒隊列中,保證其的可複用性!

所以,咱們須要對原來的Connection作裝飾,其作法很簡單,可是很累,這裏新建一個類來實現Connection接口,經過重寫全部的方法來實現一個**"可編輯"**的Connection,咱們稱之爲Connection的裝飾者:

public class HoneycombConnectionDecorator implements Connection{

    protected Connection connection;
    
    protected HoneycombConnectionDecorator(Connection connection) {
        this.connection = connection;
    }
    
    此處省略對方法實現的三百行代碼...
}
複製代碼

以後,咱們須要新建一個本身的Connection來繼承這個裝飾者,並重寫相應的方法:

public class HoneycombConnection extends HoneycombConnectionDecorator implements HoneycombConnectionSwitcher{
    @Override
    public void close() { do some things }

    @Override
    public boolean isClosed() throws SQLException { do some things }    
    
    省略...
}
複製代碼

DataSource的重寫

DataSource是JDK爲了更好的統合和管理數據源而定義出的一個規範,獲取鏈接的入口,方便咱們在這一層更好的擴展數據源(例如增長特殊屬性),使咱們的鏈接池的功能更加豐富,咱們須要實現一個本身的DataSource能:

public class HoneycombWrapperDatasource implements DataSource{
    protected HoneycombDatasourceConfig config;
    省略其它方法的實現...
    @Override
    public Connection getConnection() throws SQLException {
        return DriverManager.getConnection(config.getUrl(), config.getUser(), config.getPassword());
    }

    @Override
    public Connection getConnection(String username, String password) throws SQLException {
        return DriverManager.getConnection(config.getUrl(), username, password);
    }
    省略其它方法的實現...
}
複製代碼

咱們完成了對數據源的實現,可是這裏獲取鏈接的方式是物理建立,咱們須要知足池化的目的,須要重寫HoneycombWrapperDatasource中的鏈接獲取邏輯,作法是建立一個新的類對父類方法重寫:

public class HoneycombDataSource extends HoneycombWrapperDatasource{
    private HoneycombConnectionPool pool;
    @Override
    public Connection getConnection() throws SQLException {
        這裏實現從pool中取出鏈接的邏輯
    }
    省略...
}
複製代碼

特性擴展

在當前結構體系下,咱們的鏈接池逐漸浮現出了雛形,但遠遠不夠的是,咱們須要在此結構下能夠作自由的擴展,使鏈接池對鏈接的控制更加靈活,所以咱們能夠引入特性這個概念,它容許咱們在其內部訪問鏈接池,並對鏈接池作一系列的擴展操做:

public abstract class AbstractFeature{
    public abstract void doing(HoneycombConnectionPool pool);
}
複製代碼

AbstractFeature抽象父類須要實現doing方法,咱們能夠在方法內部實現對鏈接池的控制,其中一個典型的例子就是對池中空閒鏈接左回收:

public class CleanerFeature extends AbstractFeature{
    @Override
    public void doing(HoneycombConnectionPool pool) {
        這裏作空閒鏈接的回收
    }
}
複製代碼

落實計劃

通過上述分析,要完成一個鏈接池,須要這些模塊的配合,整體流程以下:

第一步:設置數據源屬性

在初始化DataSource以前,咱們須要將各屬性設置進去,這裏使用HoneycombWrapperDatasource中的HoneycombDatasourceConfig來承載各屬性:

public class HoneycombDatasourceConfig {

    //db url
    private String url;

    //db user
    private String user;

    //db password
    private String password;

    //driver驅動
    private String driver;

    //初始化鏈接數,默認爲2
    private int initialPoolSize = 2;

    //最大鏈接數,默認爲10
    private int maxPoolSize = 10;

    //最小鏈接數,默認爲2
    private int minPoolSize = 2;
    
    //獲取鏈接時,最大等待時長,默認爲60s
    private long maxWaitTime = 60 * 1000;

    //最大空閒時長,超出要被回收,默認爲20s
    private long maxIdleTime = 20 * 1000;
    
    //特性列表
    private List<AbstractFeature> features;
    
    public HoneycombDatasourceConfig() {
        features = new ArrayList<AbstractFeature>(5);
    }
    
    省略getter、setter....
複製代碼

第二步:初始化鏈接池

設置好屬性以後,咱們須要完成鏈接池的初始化工做,在HoneycombDataSourceinit方法中實現:

private void init() throws ClassNotFoundException, SQLException {
    //阻塞其餘線程初始化操做,等待初始化完成
    if(initialStarted || ! (initialStarted = ! initialStarted)) {
        if(! initialFinished) {
            try {
                INITIAL_LOCK.lock();
                INITIAL_CONDITION.await();
            } catch (InterruptedException e) {
            } finally {
                INITIAL_LOCK.unlock();
            }
        }
        return;
    }
    
    //config參數校驗
    config.assertSelf();
    
    Class.forName(getDriver());
    
    //實例化線程池
    pool = new HoneycombConnectionPool(config);
    
    //初始化最小鏈接
    Integer index = null;
    for(int i = 0; i < config.getInitialPoolSize(); i ++) {
        if((index =  pool.applyIndex()) != null) {
            pool.putLeisureConnection(createNativeConnection(pool), index);
        }
    }
    
    //觸發特性
    pool.touchFeatures();
    
    //完成初始化並喚醒其餘阻塞
    initialFinished = true;
    try {
        INITIAL_LOCK.lock();
        INITIAL_CONDITION.signalAll();
    }catch(Exception e) {
    }finally {
        INITIAL_LOCK.unlock();
    }
}
複製代碼

第三步:建立初始鏈接

init的方法中,若是initialPoolSize大於0,會去建立指定數量的物理鏈接放入鏈接池中,建立數量要小於最大鏈接數maxPoolSize

public HoneycombConnection createNativeConnection(HoneycombConnectionPool pool) throws SQLException {
    return new HoneycombConnection(super.getConnection(), pool);
}
複製代碼

完成初始化後,下一步就是獲取鏈接。

第四步:從空閒池獲取

咱們以前將鏈接池分紅了三個,它們分別是空閒池工做池回收池

咱們能夠經過HoneycombDataSourcegetConnection方法來獲取鏈接,當咱們須要獲取時,首先考慮的是空閒池是否有空閒鏈接,這樣能夠避免建立和激活新的鏈接:

@Override
public Connection getConnection() throws SQLException {
    try {
    	//初始化鏈接池
        init();
    } catch (ClassNotFoundException e) {
        throw new RuntimeException(e);
    }
    
    HoneycombConnection cn = null;
    Integer index = null;
    
    if(pool.assignable()) {
    	//空閒池可分配,從空閒池取出
        cn = pool.getIdleConnection();
    }else if(pool.actionable()) {
    	//回收池可分配,從回收池取出
        cn = pool.getFreezeConnection();
    }else if((index =  pool.applyIndex()) != null) {
    	//若是鏈接數未滿,建立新的物理鏈接
        cn = pool.putOccupiedConnection(createNativeConnection(pool), index);
    }
    
    if(cn == null) {
    	//若是沒法獲取鏈接,阻塞等待空閒池鏈接
        cn = pool.getIdleConnection();
    }
    
    if(cn.isClosedActive()) {
    	//若是物理鏈接關閉,則獲取新的鏈接
        cn.setConnection(super.getConnection());
    }
    return cn;
}
複製代碼

第五步:從回收池獲取

若是空閒池不可分配,那麼說明鏈接供不該求,也許以前有些空閒鏈接已經被回收(物理關閉),那麼咱們在建立新鏈接以前,能夠到回收池看一下是否存在已回收鏈接,若是存在直接取出:

else if(pool.actionable()) {
	//回收池可分配,從回收池取出
    cn = pool.getFreezeConnection();
}
複製代碼

第六步:建立新的鏈接

若是回收池也不可分配,此時要判斷鏈接池鏈接數量是否已經達到最大鏈接,若是沒有達到,建立新的物理鏈接並直接添加到工做池中:

else if((index =  pool.applyIndex()) != null) {
	//若是鏈接數未滿,建立新的物理鏈接,添加到工做池
    cn = pool.putOccupiedConnection(createNativeConnection(pool), index);
}
複製代碼

第七步:等待空閒池的鏈接

若是上述三種狀況都不知足,那麼只能從空閒池等待其餘鏈接的釋放:

if(cn == null) {
	//若是沒法獲取鏈接,阻塞等待空閒池鏈接
    cn = pool.getIdleConnection();
}
複製代碼

具體邏輯封裝在HoneycombConnectionPoolgetIdleConnection方法中:

public HoneycombConnection getIdleConnection() {
    try {
    	//獲取最大等待時間
        long waitTime = config.getMaxWaitTime();
        while(waitTime > 0) {
            long beginPollNanoTime = System.nanoTime();
            
            //設置超時時間,阻塞等待其餘鏈接的釋放
            HoneycombConnection nc = idleQueue.poll(waitTime, TimeUnit.MILLISECONDS);
            if(nc != null) {
            	//狀態轉換
                if(nc.isClosed() && nc.switchOccupied() && working(nc)) {
                    return nc;
                }
            }
            long timeConsuming = (System.nanoTime() - beginPollNanoTime) / (1000 * 1000);
            
            //也許在超時時間內獲取到了鏈接,可是狀態轉換失敗,此時刷新超時時間
            waitTime -= timeConsuming;
        }
    } catch (Exception e) {
        e.printStackTrace();
    }finally {
    }
    throw new RuntimeException("獲取鏈接超時");
}
複製代碼

第八步:激活鏈接

最後,判斷一下鏈接是否被物理關閉,若是是,咱們須要打開新的鏈接替換已經被回收的鏈接:

if(cn.isClosedActive()) {
	//若是物理鏈接關閉,則獲取新的鏈接
    cn.setConnection(super.getConnection());
}
複製代碼

鏈接的回收

若是在某段時間內咱們的業務量劇增,那麼須要同時工做的鏈接將會不少,以後過了不久,咱們的業務量降低,那麼以前已經建立的鏈接明顯飽和,這時就須要咱們對其進行回收,咱們能夠經過AbstractFeature入口操做鏈接池。

對於回收這個操做,咱們經過CleanerFeature來實現:

public class CleanerFeature extends AbstractFeature{

    private Logger logger = LoggerFactory.getLogger(CleanerFeature.class);

    public CleanerFeature(boolean enable, long interval) {
       //enable表示是否啓用
       //interval表示掃描間隔
       super(enable, interval);
    }

    @Override
    public void doing(HoneycombConnectionPool pool) {
        LinkedBlockingDeque<HoneycombConnection> idleQueue = pool.getIdleQueue();
        Thread t = new Thread() {
            @Override
            public void run() {
                while(true) {
                    try {
                        //回收掃描間隔
                    	Thread.sleep(interval);
                        
                    	//回收時,空閒池上鎖
                        synchronized (idleQueue) {
                            logger.debug("Cleaner Model To Start {}", idleQueue.size());
                            //回收操做
                            idleQueue.stream().filter(c -> { return c.idleTime() > pool.getConfig().getMaxIdleTime(); }).forEach(c -> {
                                try {
                                    if(! c.isClosedActive() && c.idle()) {
                                        c.closeActive();
                                        pool.freeze(c);
                                    }
                                } catch (SQLException e) {
                                    e.printStackTrace();
                                } 
                            });
                            logger.debug("Cleaner Model To Finished {}", idleQueue.size());
                        }
                    }catch(Throwable e) {
                        logger.error("Cleaner happended error", e);
                    }
                }
            }
        };
        t.setDaemon(true);
        t.start();
    }
}
複製代碼

這裏的操做很簡單,對空閒池加鎖,掃描全部鏈接,釋放空閒時間超過最大空閒時間設置的鏈接,其實這裏只要知道當前鏈接的空閒時長就一目瞭然了,咱們在鏈接放入空閒池時候去刷新他的空閒時間點,那麼當前的空閒時長就等於當前時間減去空閒開始時間:

idleTime = nowTime - idleStartTime
複製代碼

在切換狀態爲空閒時刷新空閒開始時間:

@Override
public boolean switchIdle() {
    return unsafe.compareAndSwapObject(this, statusOffset, status, ConnectionStatus.IDLE) && flushIdleStartTime();
}
複製代碼

測試一下

體驗成果的最快途徑就是投入使用,這裏搞一個單元測試體驗一下:

static ThreadPoolExecutor tpe = new ThreadPoolExecutor(1000, 1000, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
    
@Test
public void testConcurrence() throws SQLException, InterruptedException{
    long start = System.currentTimeMillis();
    HoneycombDataSource dataSource = new HoneycombDataSource();
    dataSource.setUrl("jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false&transformedBitIsBoolean=true&zeroDateTimeBehavior=CONVERT_TO_NULL&serverTimezone=Asia/Shanghai");
    dataSource.setUser("root");
    dataSource.setPassword("root");
    dataSource.setDriver("com.mysql.cj.jdbc.Driver");
    dataSource.setMaxPoolSize(50);
    dataSource.setInitialPoolSize(10);
    dataSource.setMinPoolSize(10);
    dataSource.setMaxWaitTime(60 * 1000);
    dataSource.setMaxIdleTime(10 * 1000);
    dataSource.addFeature(new CleanerFeature(true, 5 * 1000));
    
    test(dataSource, 10000);
    System.out.println(System.currentTimeMillis() - start + " ms");
}

public static void test(DataSource dataSource, int count) throws SQLException, InterruptedException {
    CountDownLatch cdl = new CountDownLatch(count);
    for(int i = 0; i < count; i ++) {
        tpe.execute(() -> {
            try {
                HoneycombConnection connection = (HoneycombConnection) dataSource.getConnection();
                Statement s = connection.createStatement();
                s.executeQuery("select * from test limit 1");
                connection.close();
            }catch(Exception e) {
            }finally {
                cdl.countDown();
            }
        });
    }
    cdl.await();
    tpe.shutdown();
}
複製代碼

PC配置:Intel(R) Core(TM) i5-8300H CPU @ 2.30GHz 2.30 GHz 4核8G 512SSD

10000次查詢,耗時:

938 ms
複製代碼

結束語:再次召喚傳送門:github.com/ainilili/ho…

相關文章
相關標籤/搜索