上一篇文章重點介紹了一下Java的Future模式,最後意淫了一個數據庫鏈接池的場景。本想經過Future模式來防止,當多個線程同時獲取數據庫鏈接時各自都生成一個,形成資源浪費。可是忽略了一個根本的功能,就是多個線程同時調用get方法時,獲得的是同一個數據庫鏈接的多個引用,這會致使嚴重的問題。數據庫
因此,我抽空看了看呼聲很高的Druid的數據庫鏈接池實現,固然關注點主要是多線程方面的處理。我以爲,帶着問題去看源碼是一種很好的思考方式。數組
Druid不只僅是一個數據庫鏈接池,還有不少標籤,好比統計監控、過濾器、SQL解析等。既然要分析鏈接池,那先看看DruidDataSource類緩存
getConnection方法的實現:數據結構
@Override public DruidPooledConnection getConnection() throws SQLException { return getConnection(maxWait); } public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException { init(); if (filters.size() > 0) { FilterChainImpl filterChain = new FilterChainImpl(this); return filterChain.dataSource_connect(this, maxWaitMillis); } else { return getConnectionDirect(maxWaitMillis); } }
返回的是一個DruidPooledConnection,這個類後面再說;另外這裏傳入了一個long類型maxWait,應該是用來作超時處理的;init方法在getConnection方法裏面調用,這也是一種很好的設計;裏面的過濾器鏈的處理就很少說了。多線程
public void init() throws SQLException { if (inited) { return; } final ReentrantLock lock = this.lock; // 使用lock而不是synchronized try { lock.lockInterruptibly(); } catch (InterruptedException e) { throw new SQLException("interrupt", e); } boolean init = false; try { if (inited) { return; } init = true; connections = new DruidConnectionHolder[maxActive]; // 數組 try { // init connections for (int i = 0, size = getInitialSize(); i < size; ++i) { Connection conn = createPhysicalConnection(); // 生成真正的數據庫鏈接 DruidConnectionHolder holder = new DruidConnectionHolder(this, conn); connections[poolingCount] = holder; incrementPoolingCount(); } if (poolingCount > 0) { poolingPeak = poolingCount; poolingPeakTime = System.currentTimeMillis(); } } catch (SQLException ex) { LOG.error("init datasource error, url: " + this.getUrl(), ex); connectError = ex; } createAndLogThread(); createAndStartCreatorThread(); createAndStartDestroyThread(); initedLatch.await(); initedTime = new Date(); registerMbean(); if (connectError != null && poolingCount == 0) { throw connectError; } } catch (SQLException e) { LOG.error("dataSource init error", e); throw e; } catch (InterruptedException e) { throw new SQLException(e.getMessage(), e); } finally { inited = true; lock.unlock(); // 釋放鎖 if (init && LOG.isInfoEnabled()) { LOG.info("{dataSource-" + this.getID() + "} inited"); } } }
我這裏作了刪減,加了一些簡單的註釋。經過這個方法,正好複習一下以前寫的那些知識點,若是感興趣,能夠看看我以前寫的文章。ide
這裏使用了lock,而且保證只會被執行一次。根據初始容量,先生成了一批數據庫鏈接,用一個數組connections存放這些鏈接的引用,並且專門定義了一個變量poolingCount來保存這些鏈接的總數量。ui
看到initedLatch.await有一種似曾相識的感受this
private final CountDownLatch initedLatch = new CountDownLatch(2);
這裏調用了await方法,那countDown方法在哪些線程裏面被調用呢url
protected void createAndStartCreatorThread() { if (createScheduler == null) { String threadName = "Druid-ConnectionPool-Create-" + System.identityHashCode(this); createConnectionThread = new CreateConnectionThread(threadName); createConnectionThread.start(); return; } initedLatch.countDown(); }
這裏先判斷createScheduler這個調度線程池是否被設置,若是沒有設置,直接countDown;不然,就開啓一個建立數據庫鏈接的線程,固然這個線程的run方法仍是會調用countDown方法。可是這裏我有一個疑問:開啓建立鏈接的線程,爲何必定要有一個調度線程池呢???spa
難道是當數據庫鏈接建立失敗的時候,須要過了指定時間後,再重試?這麼理解好像有點牽強,但願高人來評論。
還有就是,當開啓destroy線程的時候也會調用countDown方法。
接着在看getConnection方法,一直調用到getConnectionInternal方法
DruidConnectionHolder holder; try { lock.lockInterruptibly(); } catch (InterruptedException e) { connectErrorCount.incrementAndGet(); throw new SQLException("interrupt", e); } try { if (maxWait > 0) { holder = pollLast(nanos); } else { holder = takeLast(); } } catch (InterruptedException e) { connectErrorCount.incrementAndGet(); throw new SQLException(e.getMessage(), e); } catch (SQLException e) { connectErrorCount.incrementAndGet(); throw e; } finally { lock.unlock(); } holder.incrementUseCount(); DruidPooledConnection poolalbeConnection = new DruidPooledConnection(holder); return poolalbeConnection;
我這裏仍是作了刪減。大致邏輯是:先從鏈接池中取出DruidConnectionHolder,而後再封裝成DruidPooledConnection對象返回。再看看取holder的方法:
DruidConnectionHolder takeLast() throws InterruptedException, SQLException { try { while (poolingCount == 0) { emptySignal(); // send signal to CreateThread create connection notEmptyWaitThreadCount++; if (notEmptyWaitThreadCount > notEmptyWaitThreadPeak) { notEmptyWaitThreadPeak = notEmptyWaitThreadCount; } try { notEmpty.await(); // signal by recycle or creator } finally { notEmptyWaitThreadCount--; } notEmptyWaitCount++; if (!enable) { connectErrorCount.incrementAndGet(); throw new DataSourceDisableException(); } } } catch (InterruptedException ie) { notEmpty.signal(); // propagate to non-interrupted thread notEmptySignalCount++; throw ie; } decrementPoolingCount(); DruidConnectionHolder last = connections[poolingCount]; connections[poolingCount] = null; return last; }
這個方法很是好的詮釋了Lock-Condition的使用場景,幾行綠色的註釋解釋的很明白了,若是對empty和notEmpty看不太懂,能夠去看看我以前寫的那篇文章。
這個方法的邏輯:先判斷池中的鏈接數,若是到0了,那麼本線程就得被掛起,同時釋放empty信號,而且等待notEmpty的信號。若是還有鏈接,就取出數組的最後一個,同時更改poolingCount。
到這裏,基本理解了Druid數據庫鏈接池獲取鏈接的實現流程。可是,若是不去看看裏面的數據結構,仍是會一頭霧水。咱們就看看幾個基本的類,以及它們之間的持有關係。
一、DruidDataSource持有一個DruidConnectionHolder的數組,保存全部的數據庫鏈接
private volatile DruidConnectionHolder[] connections; // 注意這裏的volatile
二、DruidConnectionHolder持有數據庫鏈接,還有所在的DataSource等
private final DruidAbstractDataSource dataSource; private final Connection conn;
三、DruidPooledConnection持有DruidConnectionHolder,所在線程等
protected volatile DruidConnectionHolder holder; private final Thread ownerThread;
對於這種設計,我很好奇爲何要添加一層holder作封裝,數組裏直接存放Connection好像也何嘗不可。
其實,這麼設計是有道理的。好比說,一個Connection對象能夠產生多個Statement對象,當咱們想同時保存Connection和對應的多個Statement的時候,就比較糾結。
再看看DruidConnectionHolder的成員變量
private PreparedStatementPool statementPool; private final List<Statement> statementTrace = new ArrayList<Statement>(2);
這樣的話,既能夠作緩存,也能夠作統計。
最終咱們對Connection的操做都是經過DruidPooledConnection來實現,好比commit、rollback等,它們大都是經過實際的數據庫鏈接完成工做。而我比較關心的是close方法的實現,close方法最核心的邏輯是recycle方法:
public void recycle() throws SQLException { if (this.disable) { return; } DruidConnectionHolder holder = this.holder; if (holder == null) { if (dupCloseLogEnable) { LOG.error("dup close"); } return; } if (!this.abandoned) { DruidAbstractDataSource dataSource = holder.getDataSource(); dataSource.recycle(this); } this.holder = null; conn = null; transactionInfo = null; closed = true; }
經過最後幾行代碼,可以看出,並無調用實際數據庫鏈接的close方法,而只是斷開了以前那張圖裏面的4號引用。用這種方式,來實現數據庫鏈接的複用。