不管是線程池仍是db鏈接池,他們都有一個共同的特性:資源複用,在普通的場景中,咱們使用一個鏈接,它的生命週期多是這樣的: java
一個鏈接,從建立完畢到銷燬,期間只被使用了一次(這裏的一次是指在單個做用域內的使用),當週期結束,另一個調用者仍然須要這個鏈接去作事,就要重複去經歷這種生命週期。由於建立和銷燬都是須要對應的服務消耗時間以及系統資源去處理的,這樣不只浪費了大量的系統資源,並且致使業務響應過程當中都要花費部分時間去重複的建立和銷燬,得不償失,而鏈接池便被賦予瞭解決這種問題的使命!顧名思義,鏈接池中的池字已經很生動形象的闡明瞭它的用意,它用將全部鏈接放入一個"池子"
中統一的去控制鏈接的建立和銷燬,和原始生命週期去對比,鏈接池多瞭如下特性:mysql
所以,當使用鏈接池後,咱們使用一個鏈接的生命週期將會演變成這樣: git
通靈之術 - 傳送門:github.com/ainilili/ho…,DEMO爲Java語言實現!github
事前,咱們須要點支菸分析一下時間一個鏈接池須要作哪些事情:sql
close
以及isClosed
方法。鏈接池不只僅只是對Connection
生命週期的控制,還應該加入一些特點,例如初始鏈接數,最大鏈接數,最小鏈接數、最大空閒時長以及獲取鏈接的等待時長,這些咱們也簡單支持一下。shell
目標以明確,開始動工。安全
要保證線程安全,咱們能夠將目標瞄準在JUC
包下的神通們,設咱們想要的容器爲x
,那麼x
不只須要知足基本的增刪改查功能,並且也要提供獲取超時功能,這是爲了保證當池內長時間沒有空閒鏈接時不會致使業務阻塞,即刻熔斷。另外,x
須要知足雙向操做,這是爲了鏈接池能夠識別出飽和的空閒鏈接,方便回收操做。app
綜上所述,LinkedBlockingDeque
是最合適的選擇,它使用InterruptibleReentrantLock
來保證線程安全,使用Condition
來作獲取元素的阻塞,另外支持雙向操做。ide
另外,咱們能夠將鏈接池拆分爲3個類型:單元測試
其中,工做池和回收池大可沒必要用雙向對列,或許用單向隊列或者Set
均可以代替之:
private LinkedBlockingQueue<HoneycombConnection> workQueue;
private LinkedBlockingDeque<HoneycombConnection> idleQueue;
private LinkedBlockingQueue<HoneycombConnection> freezeQueue;
複製代碼
鏈接池的輸出是Connection
,它表明着一個db鏈接,上游服務使用它作完操做後,會直接調用它的close
方法來釋放鏈接,而咱們必須作的是在調用者無感知的狀況下改變它的關閉邏輯,當調用close
的方法時,咱們將它放回空閒隊列中,保證其的可複用性!
所以,咱們須要對原來的Connection
作裝飾,其作法很簡單,可是很累,這裏新建一個類來實現Connection
接口,經過重寫全部的方法來實現一個**"可編輯"**的Connection
,咱們稱之爲Connection
的裝飾者:
public class HoneycombConnectionDecorator implements Connection{
protected Connection connection;
protected HoneycombConnectionDecorator(Connection connection) {
this.connection = connection;
}
此處省略對方法實現的三百行代碼...
}
複製代碼
以後,咱們須要新建一個本身的Connection
來繼承這個裝飾者,並重寫相應的方法:
public class HoneycombConnection extends HoneycombConnectionDecorator implements HoneycombConnectionSwitcher{
@Override
public void close() { do some things }
@Override
public boolean isClosed() throws SQLException { do some things }
省略...
}
複製代碼
DataSource
是JDK爲了更好的統合和管理數據源而定義出的一個規範,獲取鏈接的入口,方便咱們在這一層更好的擴展數據源(例如增長特殊屬性),使咱們的鏈接池的功能更加豐富,咱們須要實現一個本身的DataSource
能:
public class HoneycombWrapperDatasource implements DataSource{
protected HoneycombDatasourceConfig config;
省略其它方法的實現...
@Override
public Connection getConnection() throws SQLException {
return DriverManager.getConnection(config.getUrl(), config.getUser(), config.getPassword());
}
@Override
public Connection getConnection(String username, String password) throws SQLException {
return DriverManager.getConnection(config.getUrl(), username, password);
}
省略其它方法的實現...
}
複製代碼
咱們完成了對數據源的實現,可是這裏獲取鏈接的方式是物理建立,咱們須要知足池化的目的,須要重寫HoneycombWrapperDatasource
中的鏈接獲取邏輯,作法是建立一個新的類對父類方法重寫:
public class HoneycombDataSource extends HoneycombWrapperDatasource{
private HoneycombConnectionPool pool;
@Override
public Connection getConnection() throws SQLException {
這裏實現從pool中取出鏈接的邏輯
}
省略...
}
複製代碼
在當前結構體系下,咱們的鏈接池逐漸浮現出了雛形,但遠遠不夠的是,咱們須要在此結構下能夠作自由的擴展,使鏈接池對鏈接的控制更加靈活,所以咱們能夠引入特性這個概念,它容許咱們在其內部訪問鏈接池,並對鏈接池作一系列的擴展操做:
public abstract class AbstractFeature{
public abstract void doing(HoneycombConnectionPool pool);
}
複製代碼
AbstractFeature
抽象父類須要實現doing
方法,咱們能夠在方法內部實現對鏈接池的控制,其中一個典型的例子就是對池中空閒鏈接左回收:
public class CleanerFeature extends AbstractFeature{
@Override
public void doing(HoneycombConnectionPool pool) {
這裏作空閒鏈接的回收
}
}
複製代碼
通過上述分析,要完成一個鏈接池,須要這些模塊的配合,整體流程以下:
在初始化DataSource
以前,咱們須要將各屬性設置進去,這裏使用HoneycombWrapperDatasource
中的HoneycombDatasourceConfig
來承載各屬性:
public class HoneycombDatasourceConfig {
//db url
private String url;
//db user
private String user;
//db password
private String password;
//driver驅動
private String driver;
//初始化鏈接數,默認爲2
private int initialPoolSize = 2;
//最大鏈接數,默認爲10
private int maxPoolSize = 10;
//最小鏈接數,默認爲2
private int minPoolSize = 2;
//獲取鏈接時,最大等待時長,默認爲60s
private long maxWaitTime = 60 * 1000;
//最大空閒時長,超出要被回收,默認爲20s
private long maxIdleTime = 20 * 1000;
//特性列表
private List<AbstractFeature> features;
public HoneycombDatasourceConfig() {
features = new ArrayList<AbstractFeature>(5);
}
省略getter、setter....
複製代碼
設置好屬性以後,咱們須要完成鏈接池的初始化工做,在HoneycombDataSource
的init
方法中實現:
private void init() throws ClassNotFoundException, SQLException {
//阻塞其餘線程初始化操做,等待初始化完成
if(initialStarted || ! (initialStarted = ! initialStarted)) {
if(! initialFinished) {
try {
INITIAL_LOCK.lock();
INITIAL_CONDITION.await();
} catch (InterruptedException e) {
} finally {
INITIAL_LOCK.unlock();
}
}
return;
}
//config參數校驗
config.assertSelf();
Class.forName(getDriver());
//實例化線程池
pool = new HoneycombConnectionPool(config);
//初始化最小鏈接
Integer index = null;
for(int i = 0; i < config.getInitialPoolSize(); i ++) {
if((index = pool.applyIndex()) != null) {
pool.putLeisureConnection(createNativeConnection(pool), index);
}
}
//觸發特性
pool.touchFeatures();
//完成初始化並喚醒其餘阻塞
initialFinished = true;
try {
INITIAL_LOCK.lock();
INITIAL_CONDITION.signalAll();
}catch(Exception e) {
}finally {
INITIAL_LOCK.unlock();
}
}
複製代碼
在init
的方法中,若是initialPoolSize
大於0,會去建立指定數量的物理鏈接放入鏈接池中,建立數量要小於最大鏈接數maxPoolSize
:
public HoneycombConnection createNativeConnection(HoneycombConnectionPool pool) throws SQLException {
return new HoneycombConnection(super.getConnection(), pool);
}
複製代碼
完成初始化後,下一步就是獲取鏈接。
咱們以前將鏈接池分紅了三個,它們分別是空閒池、工做池和回收池。
咱們能夠經過HoneycombDataSource
的getConnection
方法來獲取鏈接,當咱們須要獲取時,首先考慮的是空閒池是否有空閒鏈接,這樣能夠避免建立和激活新的鏈接:
@Override
public Connection getConnection() throws SQLException {
try {
//初始化鏈接池
init();
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
HoneycombConnection cn = null;
Integer index = null;
if(pool.assignable()) {
//空閒池可分配,從空閒池取出
cn = pool.getIdleConnection();
}else if(pool.actionable()) {
//回收池可分配,從回收池取出
cn = pool.getFreezeConnection();
}else if((index = pool.applyIndex()) != null) {
//若是鏈接數未滿,建立新的物理鏈接
cn = pool.putOccupiedConnection(createNativeConnection(pool), index);
}
if(cn == null) {
//若是沒法獲取鏈接,阻塞等待空閒池鏈接
cn = pool.getIdleConnection();
}
if(cn.isClosedActive()) {
//若是物理鏈接關閉,則獲取新的鏈接
cn.setConnection(super.getConnection());
}
return cn;
}
複製代碼
若是空閒池不可分配,那麼說明鏈接供不該求,也許以前有些空閒鏈接已經被回收(物理關閉),那麼咱們在建立新鏈接以前,能夠到回收池看一下是否存在已回收鏈接,若是存在直接取出:
else if(pool.actionable()) {
//回收池可分配,從回收池取出
cn = pool.getFreezeConnection();
}
複製代碼
若是回收池也不可分配,此時要判斷鏈接池鏈接數量是否已經達到最大鏈接,若是沒有達到,建立新的物理鏈接並直接添加到工做池中:
else if((index = pool.applyIndex()) != null) {
//若是鏈接數未滿,建立新的物理鏈接,添加到工做池
cn = pool.putOccupiedConnection(createNativeConnection(pool), index);
}
複製代碼
若是上述三種狀況都不知足,那麼只能從空閒池等待其餘鏈接的釋放:
if(cn == null) {
//若是沒法獲取鏈接,阻塞等待空閒池鏈接
cn = pool.getIdleConnection();
}
複製代碼
具體邏輯封裝在HoneycombConnectionPool
的getIdleConnection
方法中:
public HoneycombConnection getIdleConnection() {
try {
//獲取最大等待時間
long waitTime = config.getMaxWaitTime();
while(waitTime > 0) {
long beginPollNanoTime = System.nanoTime();
//設置超時時間,阻塞等待其餘鏈接的釋放
HoneycombConnection nc = idleQueue.poll(waitTime, TimeUnit.MILLISECONDS);
if(nc != null) {
//狀態轉換
if(nc.isClosed() && nc.switchOccupied() && working(nc)) {
return nc;
}
}
long timeConsuming = (System.nanoTime() - beginPollNanoTime) / (1000 * 1000);
//也許在超時時間內獲取到了鏈接,可是狀態轉換失敗,此時刷新超時時間
waitTime -= timeConsuming;
}
} catch (Exception e) {
e.printStackTrace();
}finally {
}
throw new RuntimeException("獲取鏈接超時");
}
複製代碼
最後,判斷一下鏈接是否被物理關閉,若是是,咱們須要打開新的鏈接替換已經被回收的鏈接:
if(cn.isClosedActive()) {
//若是物理鏈接關閉,則獲取新的鏈接
cn.setConnection(super.getConnection());
}
複製代碼
若是在某段時間內咱們的業務量劇增,那麼須要同時工做的鏈接將會不少,以後過了不久,咱們的業務量降低,那麼以前已經建立的鏈接明顯飽和,這時就須要咱們對其進行回收,咱們能夠經過AbstractFeature
入口操做鏈接池。
對於回收這個操做,咱們經過CleanerFeature
來實現:
public class CleanerFeature extends AbstractFeature{
private Logger logger = LoggerFactory.getLogger(CleanerFeature.class);
public CleanerFeature(boolean enable, long interval) {
//enable表示是否啓用
//interval表示掃描間隔
super(enable, interval);
}
@Override
public void doing(HoneycombConnectionPool pool) {
LinkedBlockingDeque<HoneycombConnection> idleQueue = pool.getIdleQueue();
Thread t = new Thread() {
@Override
public void run() {
while(true) {
try {
//回收掃描間隔
Thread.sleep(interval);
//回收時,空閒池上鎖
synchronized (idleQueue) {
logger.debug("Cleaner Model To Start {}", idleQueue.size());
//回收操做
idleQueue.stream().filter(c -> { return c.idleTime() > pool.getConfig().getMaxIdleTime(); }).forEach(c -> {
try {
if(! c.isClosedActive() && c.idle()) {
c.closeActive();
pool.freeze(c);
}
} catch (SQLException e) {
e.printStackTrace();
}
});
logger.debug("Cleaner Model To Finished {}", idleQueue.size());
}
}catch(Throwable e) {
logger.error("Cleaner happended error", e);
}
}
}
};
t.setDaemon(true);
t.start();
}
}
複製代碼
這裏的操做很簡單,對空閒池加鎖,掃描全部鏈接,釋放空閒時間超過最大空閒時間設置的鏈接,其實這裏只要知道當前鏈接的空閒時長就一目瞭然了,咱們在鏈接放入空閒池時候去刷新他的空閒時間點,那麼當前的空閒時長就等於當前時間減去空閒開始時間:
idleTime = nowTime - idleStartTime
複製代碼
在切換狀態爲空閒時刷新空閒開始時間:
@Override
public boolean switchIdle() {
return unsafe.compareAndSwapObject(this, statusOffset, status, ConnectionStatus.IDLE) && flushIdleStartTime();
}
複製代碼
體驗成果的最快途徑就是投入使用,這裏搞一個單元測試體驗一下:
static ThreadPoolExecutor tpe = new ThreadPoolExecutor(1000, 1000, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
@Test
public void testConcurrence() throws SQLException, InterruptedException{
long start = System.currentTimeMillis();
HoneycombDataSource dataSource = new HoneycombDataSource();
dataSource.setUrl("jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false&transformedBitIsBoolean=true&zeroDateTimeBehavior=CONVERT_TO_NULL&serverTimezone=Asia/Shanghai");
dataSource.setUser("root");
dataSource.setPassword("root");
dataSource.setDriver("com.mysql.cj.jdbc.Driver");
dataSource.setMaxPoolSize(50);
dataSource.setInitialPoolSize(10);
dataSource.setMinPoolSize(10);
dataSource.setMaxWaitTime(60 * 1000);
dataSource.setMaxIdleTime(10 * 1000);
dataSource.addFeature(new CleanerFeature(true, 5 * 1000));
test(dataSource, 10000);
System.out.println(System.currentTimeMillis() - start + " ms");
}
public static void test(DataSource dataSource, int count) throws SQLException, InterruptedException {
CountDownLatch cdl = new CountDownLatch(count);
for(int i = 0; i < count; i ++) {
tpe.execute(() -> {
try {
HoneycombConnection connection = (HoneycombConnection) dataSource.getConnection();
Statement s = connection.createStatement();
s.executeQuery("select * from test limit 1");
connection.close();
}catch(Exception e) {
}finally {
cdl.countDown();
}
});
}
cdl.await();
tpe.shutdown();
}
複製代碼
PC配置:Intel(R) Core(TM) i5-8300H CPU @ 2.30GHz 2.30 GHz 4核8G 512SSD
10000次查詢,耗時:
938 ms
複製代碼
結束語:再次召喚傳送門:github.com/ainilili/ho…