在作服務化拆分的時候,若不是性能要求特別高的場景,咱們通常對外暴露Http
服務。Spring
裏提供了一個模板類RestTemplate
,經過配置RestTemplate
,咱們能夠快速地訪問外部的Http
服務。Http
底層是經過Tcp
的三次握手創建鏈接的,若每一個請求都要從新創建鏈接,那開銷是很大的,特別是對於消息體很是小的場景,開銷更大。segmentfault
若使用鏈接池的方式,來管理鏈接對象,能極大地提升服務的吞吐量。設計模式
RestTemplate
底層是封裝了HttpClient
(筆者的版本是4.3.6),它提供了鏈接池機制來處理高併發網絡請求。安全
一般,咱們採用以下的樣板代碼來構建HttpClient
:網絡
HttpClientBuilder builder = HttpClientBuilder.create(); builder.setMaxConnTotal(maxConnections).setMaxConnPerRoute(maxConnectionsPerRoute); if (!connectionReuse) { builder.setConnectionReuseStrategy(NoConnectionReuseStrategy.INSTANCE); } if (!automaticRetry) { builder.disableAutomaticRetries(); } if (!compress) { builder.disableContentCompression(); } HttpClient httpClient = builder.build();
從上面的代碼能夠看出,HttpClient
使用建造者設計模式來構造對象,最後一行代碼構建對象,前面的代碼是用來設置客戶端的最大鏈接數、單路由最大鏈接數、是否使用長鏈接、壓縮等特性。併發
咱們進入HttpClientBuilder的build()
方法,會看到以下代碼:ide
# 構造Http鏈接池管理器 final PoolingHttpClientConnectionManager poolingmgr = new PoolingHttpClientConnectionManager( RegistryBuilder.<ConnectionSocketFactory>create() .register("http", PlainConnectionSocketFactory.getSocketFactory()) .register("https", sslSocketFactory) .build()); if (defaultSocketConfig != null) { poolingmgr.setDefaultSocketConfig(defaultSocketConfig); } if (defaultConnectionConfig != null) { poolingmgr.setDefaultConnectionConfig(defaultConnectionConfig); } if (systemProperties) { String s = System.getProperty("http.keepAlive", "true"); if ("true".equalsIgnoreCase(s)) { s = System.getProperty("http.maxConnections", "5"); final int max = Integer.parseInt(s); poolingmgr.setDefaultMaxPerRoute(max); poolingmgr.setMaxTotal(2 * max); } } if (maxConnTotal > 0) { poolingmgr.setMaxTotal(maxConnTotal); } if (maxConnPerRoute > 0) { poolingmgr.setDefaultMaxPerRoute(maxConnPerRoute); } # Http鏈接管理器採用鏈接池的方式實現 connManager = poolingmgr;
默認狀況下構造出的Http
鏈接管理器是採用鏈接池的方式實現的。函數
咱們進入 PoolingHttpClientConnectionManager
的代碼,其鏈接池的核心實現是依賴於 CPool
類,而 CPool
又繼承了抽象類AbstractConnPool
, AbstractConnPool
有@ThreadSafe
的註解,說明它是線程安全類,因此 HttpClient
線程安全地獲取、釋放鏈接都依賴於 AbstractConnPool
。高併發
接下來我來看最核心的AbstractConnPool
類,如下是鏈接池的結構圖:oop
鏈接池最重要的兩個公有方法是 lease
和release
,即獲取鏈接和釋放鏈接的兩個方法。源碼分析
@Override public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) { Args.notNull(route, "Route"); Asserts.check(!this.isShutDown, "Connection pool shut down"); return new PoolEntryFuture<E>(this.lock, callback) { @Override public E getPoolEntry( final long timeout, final TimeUnit tunit) throws InterruptedException, TimeoutException, IOException { final E entry = getPoolEntryBlocking(route, state, timeout, tunit, this); onLease(entry); return entry; } }; }
lease
方法返回的是一個 Future
對象,即須要調用 Future
的get
方法,才能夠獲得PoolEntry
的對象,它包含了一個鏈接的具體信息。
而獲取鏈接是經過 getPoolEntryBlocking
方法實現的,經過函數名能夠知道,這是一個阻塞的方法,即該route
所對應的鏈接池中的鏈接不夠用時,該方法就會阻塞,直到該 route
所對應的鏈接池有鏈接釋放,方法纔會被喚醒;或者方法一直等待,直到鏈接超時拋出異常。
private E getPoolEntryBlocking( final T route, final Object state, final long timeout, final TimeUnit tunit, final PoolEntryFuture<E> future) throws IOException, InterruptedException, TimeoutException { Date deadline = null; // 設置鏈接超時時間戳 if (timeout > 0) { deadline = new Date (System.currentTimeMillis() + tunit.toMillis(timeout)); } // 獲取鏈接,並修改修改鏈接池,因此加鎖--->線程安全 this.lock.lock(); try { // 從Map中獲取該route對應的鏈接池,若Map中沒有,則建立該route對應的鏈接池 final RouteSpecificPool<T, C, E> pool = getPool(route); E entry = null; while (entry == null) { Asserts.check(!this.isShutDown, "Connection pool shut down"); for (;;) { // 獲取 同一狀態的 空閒鏈接,即從available鏈表的頭部中移除,添加到leased集合中 entry = pool.getFree(state); // 若返回鏈接爲空,跳出循環 if (entry == null) { break; } // 若鏈接已過時,則關閉鏈接 if (entry.isExpired(System.currentTimeMillis())) { entry.close(); } else if (this.validateAfterInactivity > 0) { if (entry.getUpdated() + this.validateAfterInactivity <= System.currentTimeMillis()) { if (!validate(entry)) { entry.close(); } } } if (entry.isClosed()) { // 若該鏈接已關閉,則總的available鏈表中刪除該鏈接 this.available.remove(entry); // 從該route對應的鏈接池的leased集合中刪除該鏈接,而且不回收到available鏈表中 pool.free(entry, false); } else { break; } } // 跳出for循環 if (entry != null) { // 若獲取的鏈接不爲空,將鏈接從總的available鏈表移除,並添加到leased集合中 // 獲取鏈接成功,直接返回 this.available.remove(entry); this.leased.add(entry); onReuse(entry); return entry; } // 計算該route的最大鏈接數 // New connection is needed final int maxPerRoute = getMax(route); // Shrink the pool prior to allocating a new connection // 計算該route鏈接池中的鏈接數 是否 大於等於 route最大鏈接數 final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute); // 若大於等於 route最大鏈接數,則收縮該route的鏈接池 if (excess > 0) { for (int i = 0; i < excess; i++) { // 獲取該route鏈接池中最不經常使用的空閒鏈接,即available鏈表末尾的鏈接 // 由於回收鏈接時,老是將鏈接添加到available鏈表的頭部,因此鏈表尾部的鏈接是最有可能過時的 final E lastUsed = pool.getLastUsed(); if (lastUsed == null) { break; } // 關閉鏈接,並從總的空閒鏈表以及route對應的鏈接池中刪除 lastUsed.close(); this.available.remove(lastUsed); pool.remove(lastUsed); } } // 該route的鏈接池大小 小於 route最大鏈接數 if (pool.getAllocatedCount() < maxPerRoute) { final int totalUsed = this.leased.size(); final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0); if (freeCapacity > 0) { final int totalAvailable = this.available.size(); // 總的空閒鏈接數 大於等於 總的鏈接池剩餘容量 if (totalAvailable > freeCapacity - 1) { if (!this.available.isEmpty()) { // 從總的available鏈表中 以及 route對應的鏈接池中 刪除鏈接,並關閉鏈接 final E lastUsed = this.available.removeLast(); lastUsed.close(); final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute()); otherpool.remove(lastUsed); } } // 建立新鏈接,並添加到總的leased集合以及route鏈接池的leased集合中,函數返回 final C conn = this.connFactory.create(route); entry = pool.add(conn); this.leased.add(entry); return entry; } } //route的鏈接池已滿,沒法分配鏈接 boolean success = false; try { // 將該獲取鏈接的任務放入pending隊列 pool.queue(future); this.pending.add(future); // 阻塞等待,若在超時以前被喚醒,則返回true;若直到超時才返回,則返回false success = future.await(deadline); } finally { // In case of 'success', we were woken up by the // connection pool and should now have a connection // waiting for us, or else we're shutting down. // Just continue in the loop, both cases are checked. // 不管是 被喚醒返回、超時返回 仍是被 中斷異常返回,都會進入finally代碼段 // 從pending隊列中移除 pool.unqueue(future); this.pending.remove(future); } // check for spurious wakeup vs. timeout // 判斷是僞喚醒 仍是 鏈接超時 // 如果 鏈接超時,則跳出while循環,並拋出 鏈接超時的異常; // 如果 僞喚醒,則繼續循環獲取鏈接 if (!success && (deadline != null) && (deadline.getTime() <= System.currentTimeMillis())) { break; } } throw new TimeoutException("Timeout waiting for connection"); } finally { // 釋放鎖 this.lock.unlock(); } }
@Override public void release(final E entry, final boolean reusable) { // 獲取鎖 this.lock.lock(); try { // 從總的leased集合中移除鏈接 if (this.leased.remove(entry)) { final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute()); // 回收鏈接 pool.free(entry, reusable); if (reusable && !this.isShutDown) { this.available.addFirst(entry); onRelease(entry); } else { entry.close(); } // 獲取pending隊列隊頭的任務(先進先出原則),喚醒該阻塞的任務 PoolEntryFuture<E> future = pool.nextPending(); if (future != null) { this.pending.remove(future); } else { future = this.pending.poll(); } if (future != null) { future.wakeup(); } } } finally { // 釋放鎖 this.lock.unlock(); } }
AbstractConnPool
其實就是經過在獲取鏈接、釋放鏈接時加鎖,來實現線程安全,思路很是簡單,但它沒有在route對應的鏈接池中加鎖對象,即 RouteSpecificPool
的獲取鏈接、釋放鏈接操做是不加鎖的,由於已經在 AbstractConnPool
的外部調用中加鎖,因此是線程安全的,簡化了設計。
另外每一個route
對應一個鏈接池,實現了在host
級別的隔離,若下游的某臺提供服務的主機掛了,無效的鏈接最多隻佔用該route
對應的鏈接池,不會佔用整個鏈接池,從而拖垮整個服務。
以上。