Http持久鏈接與HttpClient鏈接池

Tips:關注公衆號:松花皮蛋的黑板報,領取程序員月薪25K+祕籍,進軍BAT必備!程序員

1、背景

HTTP協議是無狀態的協議,即每一次請求都是互相獨立的。所以它的最初實現是,每個http請求都會打開一個tcp socket鏈接,當交互完畢後會關閉這個鏈接。瀏覽器

HTTP協議是全雙工的協議,因此創建鏈接與斷開鏈接是要通過三次握手與四次揮手的。顯然在這種設計中,每次發送Http請求都會消耗不少的額外資源,即鏈接的創建與銷燬。安全

因而,HTTP協議的也進行了發展,經過持久鏈接的方法來進行socket鏈接複用。bash


從圖中能夠看到:服務器

  1. 在串行鏈接中,每次交互都要打開關閉鏈接
  2. 在持久鏈接中,第一次交互會打開鏈接,交互結束後鏈接並不關閉,下次交互就省去了創建鏈接的過程。

持久鏈接的實現有兩種:HTTP/1.0+的keep-alive與HTTP/1.1的持久鏈接。微信

2、HTTP/1.0+的Keep-Alive

從1996年開始,不少HTTP/1.0瀏覽器與服務器都對協議進行了擴展,那就是「keep-alive」擴展協議。app

注意,這個擴展協議是做爲1.0的補充的「實驗型持久鏈接」出現的。keep-alive已經再也不使用了,最新的HTTP/1.1規範中也沒有對它進行說明,只是不少應用延續了下來。less

使用HTTP/1.0的客戶端在首部中加上"Connection:Keep-Alive",請求服務端將一條鏈接保持在打開狀態。服務端若是願意將這條鏈接保持在打開狀態,就會在響應中包含一樣的首部。若是響應中沒有包含"Connection:Keep-Alive"首部,則客戶端會認爲服務端不支持keep-alive,會在發送完響應報文以後關閉掉當前鏈接。異步

經過keep-alive補充協議,客戶端與服務器之間完成了持久鏈接,然而仍然存在着一些問題:socket

  • 在HTTP/1.0中keep-alive不是標準協議,客戶端必須發送Connection:Keep-Alive來激活keep-alive鏈接。
  • 代理服務器可能沒法支持keep-alive,由於一些代理是"盲中繼",沒法理解首部的含義,只是將首部逐跳轉發。因此可能形成客戶端與服務端都保持了鏈接,可是代理不接受該鏈接上的數據。

3、HTTP/1.1的持久鏈接

HTTP/1.1採起持久鏈接的方式替代了Keep-Alive。

HTTP/1.1的鏈接默認狀況下都是持久鏈接。若是要顯式關閉,須要在報文中加上Connection:Close首部。即在HTTP/1.1中,全部的鏈接都進行了複用。

然而如同Keep-Alive同樣,空閒的持久鏈接也能夠隨時被客戶端與服務端關閉。不發送Connection:Close不意味着服務器承諾鏈接永遠保持打開。

4、HttpClient如何生成持久鏈接

HttpClien中使用了鏈接池來管理持有鏈接,同一條TCP鏈路上,鏈接是能夠複用的。HttpClient經過鏈接池的方式進行鏈接持久化。

其實「池」技術是一種通用的設計,其設計思想並不複雜:

  1. 當有鏈接第一次使用的時候創建鏈接
  2. 結束時對應鏈接不關閉,歸還到池中
  3. 下次同個目的的鏈接可從池中獲取一個可用鏈接
  4. 按期清理過時鏈接

全部的鏈接池都是這個思路,不過咱們看HttpClient源碼主要關注兩點:

  • 鏈接池的具體設計方案,以供之後自定義鏈接池參考
  • 如何與HTTP協議對應上,即理論抽象轉爲代碼的實現

4.1 HttpClient鏈接池的實現

HttpClient關於持久鏈接的處理在下面的代碼中能夠集中體現,下面從MainClientExec摘取了和鏈接池相關的部分,去掉了其餘部分:

public class MainClientExec implements ClientExecChain {

    @Override
    public CloseableHttpResponse execute(
            final HttpRoute route,
            final HttpRequestWrapper request,
            final HttpClientContext context,
            final HttpExecutionAware execAware) throws IOException, HttpException {
     //從鏈接管理器HttpClientConnectionManager中獲取一個鏈接請求ConnectionRequest
        final ConnectionRequest connRequest = connManager.requestConnection(route, userToken);final HttpClientConnection managedConn;
        final int timeout = config.getConnectionRequestTimeout();        //從鏈接請求ConnectionRequest中獲取一個被管理的鏈接HttpClientConnection
        managedConn = connRequest.get(timeout > 0 ? timeout : 0, TimeUnit.MILLISECONDS);
     //將鏈接管理器HttpClientConnectionManager與被管理的鏈接HttpClientConnection交給一個ConnectionHolder持有
        final ConnectionHolder connHolder = new ConnectionHolder(this.log, this.connManager, managedConn);
        try {
            HttpResponse response;
            if (!managedConn.isOpen()) {          //若是當前被管理的鏈接不是出於打開狀態,須要從新創建鏈接
                establishRoute(proxyAuthState, managedConn, route, request, context);
            }
       //經過鏈接HttpClientConnection發送請求
            response = requestExecutor.execute(request, managedConn, context);
       //經過鏈接重用策略判斷是否鏈接可重用         
            if (reuseStrategy.keepAlive(response, context)) {
                //得到鏈接有效期
                final long duration = keepAliveStrategy.getKeepAliveDuration(response, context);
                //設置鏈接有效期
                connHolder.setValidFor(duration, TimeUnit.MILLISECONDS);          //將當前鏈接標記爲可重用狀態
                connHolder.markReusable();
            } else {
                connHolder.markNonReusable();
            }
        }
        final HttpEntity entity = response.getEntity();
        if (entity == null || !entity.isStreaming()) {
            //將當前鏈接釋放到池中,供下次調用
            connHolder.releaseConnection();
            return new HttpResponseProxy(response, null);
        } else {
            return new HttpResponseProxy(response, connHolder);
        }
}複製代碼

這裏看到了在Http請求過程當中對鏈接的處理是和協議規範是一致的,這裏要展開講一下具體實現。

PoolingHttpClientConnectionManager是HttpClient默認的鏈接管理器,首先經過requestConnection()得到一個鏈接的請求,注意這裏不是鏈接。

public ConnectionRequest requestConnection(
            final HttpRoute route,
            final Object state) {final Future<CPoolEntry> future = this.pool.lease(route, state, null);
        return new ConnectionRequest() {
            @Override
            public boolean cancel() {
                return future.cancel(true);
            }
            @Override
            public HttpClientConnection get(
                    final long timeout,
                    final TimeUnit tunit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {
                final HttpClientConnection conn = leaseConnection(future, timeout, tunit);
                if (conn.isOpen()) {
                    final HttpHost host;
                    if (route.getProxyHost() != null) {
                        host = route.getProxyHost();
                    } else {
                        host = route.getTargetHost();
                    }
                    final SocketConfig socketConfig = resolveSocketConfig(host);
                    conn.setSocketTimeout(socketConfig.getSoTimeout());
                }
                return conn;
            }
        };
    }複製代碼

能夠看到返回的ConnectionRequest對象其實是一個持有了Future<CPoolEntry>,CPoolEntry是被鏈接池管理的真正鏈接實例。

從上面的代碼咱們應該關注的是:

  • Future<CPoolEntry> future = this.pool.lease(route, state, null)
    •   如何從鏈接池CPool中得到一個異步的鏈接,Future<CPoolEntry>
  • HttpClientConnection conn = leaseConnection(future, timeout, tunit)
    •   如何經過異步鏈接Future<CPoolEntry>得到一個真正的鏈接HttpClientConnection

4.2 Future

看一下CPool是如何釋放一個Future<CPoolEntry>的,AbstractConnPool核心代碼以下:

private E getPoolEntryBlocking(
            final T route, final Object state,
            final long timeout, final TimeUnit tunit,
            final Future<E> future) throws IOException, InterruptedException, TimeoutException {
     //首先對當前鏈接池加鎖,當前鎖是可重入鎖ReentrantLockthis.lock.lock();
        try {        //得到一個當前HttpRoute對應的鏈接池,對於HttpClient的鏈接池而言,總池有個大小,每一個route對應的鏈接也是個池,因此是「池中池」
            final RouteSpecificPool<T, C, E> pool = getPool(route);
            E entry;
            for (;;) {
                Asserts.check(!this.isShutDown, "Connection pool shut down");          //死循環得到鏈接
                for (;;) {            //從route對應的池中拿鏈接,多是null,也多是有效鏈接
                    entry = pool.getFree(state);            //若是拿到null,就退出循環
                    if (entry == null) {
                        break;
                    }            //若是拿到過時鏈接或者已關閉鏈接,就釋放資源,繼續循環獲取
                    if (entry.isExpired(System.currentTimeMillis())) {
                        entry.close();
                    }
                    if (entry.isClosed()) {
                        this.available.remove(entry);
                        pool.free(entry, false);
                    } else {              //若是拿到有效鏈接就退出循環
                        break;
                    }
                }          //拿到有效鏈接就退出
                if (entry != null) {
                    this.available.remove(entry);
                    this.leased.add(entry);
                    onReuse(entry);
                    return entry;
                }
          //到這裏證實沒有拿到有效鏈接,須要本身生成一個                
                final int maxPerRoute = getMax(route);
                //每一個route對應的鏈接最大數量是可配置的,若是超過了,就須要經過LRU清理掉一些鏈接
                final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
                if (excess > 0) {
                    for (int i = 0; i < excess; i++) {
                        final E lastUsed = pool.getLastUsed();
                        if (lastUsed == null) {
                            break;
                        }
                        lastUsed.close();
                        this.available.remove(lastUsed);
                        pool.remove(lastUsed);
                    }
                }
          //當前route池中的鏈接數,沒有達到上線
                if (pool.getAllocatedCount() < maxPerRoute) {
                    final int totalUsed = this.leased.size();
                    final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);            //判斷鏈接池是否超過上線,若是超過了,須要經過LRU清理掉一些鏈接
                    if (freeCapacity > 0) {
                        final int totalAvailable = this.available.size();               //若是空閒鏈接數已經大於剩餘可用空間,則須要清理下空閒鏈接
                        if (totalAvailable > freeCapacity - 1) {
                            if (!this.available.isEmpty()) {
                                final E lastUsed = this.available.removeLast();
                                lastUsed.close();
                                final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
                                otherpool.remove(lastUsed);
                            }
                        }              //根據route創建一個鏈接
                        final C conn = this.connFactory.create(route);              //將這個鏈接放入route對應的「小池」中
                        entry = pool.add(conn);              //將這個鏈接放入「大池」中
                        this.leased.add(entry);
                        return entry;
                    }
                }
         //到這裏證實沒有從得到route池中得到有效鏈接,而且想要本身創建鏈接時當前route鏈接池已經到達最大值,即已經有鏈接在使用,可是對當前線程不可用
                boolean success = false;
                try {
                    if (future.isCancelled()) {
                        throw new InterruptedException("Operation interrupted");
                    }            //將future放入route池中等待
                    pool.queue(future);            //將future放入大鏈接池中等待
                    this.pending.add(future);            //若是等待到了信號量的通知,success爲true
                    if (deadline != null) {
                        success = this.condition.awaitUntil(deadline);
                    } else {
                        this.condition.await();
                        success = true;
                    }
                    if (future.isCancelled()) {
                        throw new InterruptedException("Operation interrupted");
                    }
                } finally {
                    //從等待隊列中移除
                    pool.unqueue(future);
                    this.pending.remove(future);
                }
                //若是沒有等到信號量通知而且當前時間已經超時,則退出循環
                if (!success && (deadline != null && deadline.getTime() <= System.currentTimeMillis())) {
                    break;
                }
            }       //最終也沒有等到信號量通知,沒有拿到可用鏈接,則拋異常
            throw new TimeoutException("Timeout waiting for connection");
        } finally {       //釋放對大鏈接池的鎖
            this.lock.unlock();
        }
    }複製代碼

上面的代碼邏輯有幾個重要點:

  • 鏈接池有個最大鏈接數,每一個route對應一個小鏈接池,也有個最大鏈接數
  • 不管是大鏈接池仍是小鏈接池,當超過數量的時候,都要經過LRU釋放一些鏈接
  • 若是拿到了可用鏈接,則返回給上層使用
  • 若是沒有拿到可用鏈接,HttpClient會判斷當前route鏈接池是否已經超過了最大數量,沒有到上限就會新建一個鏈接,並放入池中
  • 若是到達了上限,就排隊等待,等到了信號量,就從新得到一次,等待不到就拋超時異常
  • 經過線程池獲取鏈接要經過ReetrantLock加鎖,保證線程安全

到這裏爲止,程序已經拿到了一個可用的CPoolEntry實例,或者拋異常終止了程序。

4.3 HttpClientConnection

protected HttpClientConnection leaseConnection(
            final Future<CPoolEntry> future,
            final long timeout,
            final TimeUnit tunit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {
        final CPoolEntry entry;
        try {       //從異步操做Future<CPoolEntry>中得到CPoolEntry
            entry = future.get(timeout, tunit);
            if (entry == null || future.isCancelled()) {
                throw new InterruptedException();
            }
            Asserts.check(entry.getConnection() != null, "Pool entry with no connection");
            if (this.log.isDebugEnabled()) {
                this.log.debug("Connection leased: " + format(entry) + formatStats(entry.getRoute()));
            }       //得到一個CPoolEntry的代理對象,對其操做都是使用同一個底層的HttpClientConnection
            return CPoolProxy.newProxy(entry);
        } catch (final TimeoutException ex) {
            throw new ConnectionPoolTimeoutException("Timeout waiting for connection from pool");
        }
    }複製代碼

5、HttpClient如何複用持久鏈接?

在上一章中,咱們看到了HttpClient經過鏈接池來得到鏈接,當須要使用鏈接的時候從池中得到。

對應着第三章的問題:

  1. 當有鏈接第一次使用的時候創建鏈接
  2. 結束時對應鏈接不關閉,歸還到池中
  3. 下次同個目的的鏈接可從池中獲取一個可用鏈接
  4. 按期清理過時鏈接

咱們在第四章中看到了HttpClient是如何處理一、3的問題的,那麼第2個問題是怎麼處理的呢?

即HttpClient如何判斷一個鏈接在使用完畢後是要關閉,仍是要放入池中供他人複用?再看一下MainClientExec的代碼


          //發送Http鏈接                response = requestExecutor.execute(request, managedConn, context);
                //根據重用策略判斷當前鏈接是否要複用
                if (reuseStrategy.keepAlive(response, context)) {
                    //須要複用的鏈接,獲取鏈接超時時間,以response中的timeout爲準
                    final long duration = keepAliveStrategy.getKeepAliveDuration(response, context);
                    if (this.log.isDebugEnabled()) {
                        final String s;               //timeout的是毫秒數,若是沒有設置則爲-1,即沒有超時時間
                        if (duration > 0) {
                            s = "for " + duration + " " + TimeUnit.MILLISECONDS;
                        } else {
                            s = "indefinitely";
                        }
                        this.log.debug("Connection can be kept alive " + s);
                    }            //設置超時時間,當請求結束時鏈接管理器會根據超時時間決定是關閉仍是放回到池中
                    connHolder.setValidFor(duration, TimeUnit.MILLISECONDS);
                    //將鏈接標記爲可重用            connHolder.markReusable();
                } else {            //將鏈接標記爲不可重用
                    connHolder.markNonReusable();
                }複製代碼

能夠看到,當使用鏈接發生過請求以後,有鏈接重試策略來決定該鏈接是否要重用,若是要重用就會在結束後交給HttpClientConnectionManager放入池中。

那麼鏈接複用策略的邏輯是怎麼樣的呢?

public class DefaultClientConnectionReuseStrategy extends DefaultConnectionReuseStrategy {

    public static final DefaultClientConnectionReuseStrategy INSTANCE = new DefaultClientConnectionReuseStrategy();

    @Override
    public boolean keepAlive(final HttpResponse response, final HttpContext context) {
     //從上下文中拿到request
        final HttpRequest request = (HttpRequest) context.getAttribute(HttpCoreContext.HTTP_REQUEST);
        if (request != null) {       //得到Connection的Header
            final Header[] connHeaders = request.getHeaders(HttpHeaders.CONNECTION);
            if (connHeaders.length != 0) {
                final TokenIterator ti = new BasicTokenIterator(new BasicHeaderIterator(connHeaders, null));
                while (ti.hasNext()) {
                    final String token = ti.nextToken();            //若是包含Connection:Close首部,則表明請求不打算保持鏈接,會忽略response的意願,該頭部這是HTTP/1.1的規範
                    if (HTTP.CONN_CLOSE.equalsIgnoreCase(token)) {
                        return false;
                    }
                }
            }
        }     //使用父類的的複用策略
        return super.keepAlive(response, context);
    }

}複製代碼

看一下父類的複用策略

if (canResponseHaveBody(request, response)) {
                final Header[] clhs = response.getHeaders(HTTP.CONTENT_LEN);
                //若是reponse的Content-Length沒有正確設置,則不復用鏈接          //由於對於持久化鏈接,兩次傳輸之間不須要從新創建鏈接,則須要根據Content-Length確認內容屬於哪次請求,以正確處理「粘包」現象                //因此,沒有正確設置Content-Length的response鏈接不能複用
                if (clhs.length == 1) {
                    final Header clh = clhs[0];
                    try {
                        final int contentLen = Integer.parseInt(clh.getValue());
                        if (contentLen < 0) {
                            return false;
                        }
                    } catch (final NumberFormatException ex) {
                        return false;
                    }
                } else {
                    return false;
                }
            }
        if (headerIterator.hasNext()) {
            try {
                final TokenIterator ti = new BasicTokenIterator(headerIterator);
                boolean keepalive = false;
                while (ti.hasNext()) {
                    final String token = ti.nextToken();            //若是response有Connection:Close首部,則明確表示要關閉,則不復用
                    if (HTTP.CONN_CLOSE.equalsIgnoreCase(token)) {
                        return false;            //若是response有Connection:Keep-Alive首部,則明確表示要持久化,則複用
                    } else if (HTTP.CONN_KEEP_ALIVE.equalsIgnoreCase(token)) {
                        keepalive = true;
                    }
                }
                if (keepalive) {
                    return true;
                }
            } catch (final ParseException px) {
                return false;
            }
        }
     //若是response中沒有相關的Connection首部說明,則高於HTTP/1.0版本的都複用鏈接  
        return !ver.lessEquals(HttpVersion.HTTP_1_0);複製代碼

總結一下:

  • 若是request首部中包含Connection:Close,不復用
  • 若是response中Content-Length長度設置不正確,不復用
  • 若是response首部包含Connection:Close,不復用
  • 若是reponse首部包含Connection:Keep-Alive,複用
  • 都沒命中的狀況下,若是HTTP版本高於1.0則複用

從代碼中能夠看到,其實現策略與咱們第2、三章協議層的約束是一致的。

6、HttpClient如何清理過時鏈接

在HttpClient4.4版本以前,在從鏈接池中獲取重用鏈接的時候會檢查下是否過時,過時則清理。

以後的版本則不一樣,會有一個單獨的線程來掃描鏈接池中的鏈接,發現有離最近一次使用超過設置的時間後,就會清理。默認的超時時間是2秒鐘。

public CloseableHttpClient build() {            //若是指定了要清理過時鏈接與空閒鏈接,纔會啓動清理線程,默認是不啓動的
            if (evictExpiredConnections || evictIdleConnections) {          //創造一個鏈接池的清理線程
                final IdleConnectionEvictor connectionEvictor = new IdleConnectionEvictor(cm,
                        maxIdleTime > 0 ? maxIdleTime : 10, maxIdleTimeUnit != null ? maxIdleTimeUnit : TimeUnit.SECONDS,
                        maxIdleTime, maxIdleTimeUnit);
                closeablesCopy.add(new Closeable() {
                    @Override
                    public void close() throws IOException {
                        connectionEvictor.shutdown();
                        try {
                            connectionEvictor.awaitTermination(1L, TimeUnit.SECONDS);
                        } catch (final InterruptedException interrupted) {
                            Thread.currentThread().interrupt();
                        }
                    }

                });          //執行該清理線程
                connectionEvictor.start();
}複製代碼

能夠看到在HttpClientBuilder進行build的時候,若是指定了開啓清理功能,會建立一個鏈接池清理線程並運行它。

public IdleConnectionEvictor(
            final HttpClientConnectionManager connectionManager,
            final ThreadFactory threadFactory,
            final long sleepTime, final TimeUnit sleepTimeUnit,
            final long maxIdleTime, final TimeUnit maxIdleTimeUnit) {
        this.connectionManager = Args.notNull(connectionManager, "Connection manager");
        this.threadFactory = threadFactory != null ? threadFactory : new DefaultThreadFactory();
        this.sleepTimeMs = sleepTimeUnit != null ? sleepTimeUnit.toMillis(sleepTime) : sleepTime;
        this.maxIdleTimeMs = maxIdleTimeUnit != null ? maxIdleTimeUnit.toMillis(maxIdleTime) : maxIdleTime;
        this.thread = this.threadFactory.newThread(new Runnable() {
            @Override
            public void run() {
                try {            //死循環,線程一直執行
                    while (!Thread.currentThread().isInterrupted()) {              //休息若干秒後執行,默認10秒
                        Thread.sleep(sleepTimeMs);               //清理過時鏈接
                        connectionManager.closeExpiredConnections();               //若是指定了最大空閒時間,則清理空閒鏈接
                        if (maxIdleTimeMs > 0) {
                            connectionManager.closeIdleConnections(maxIdleTimeMs, TimeUnit.MILLISECONDS);
                        }
                    }
                } catch (final Exception ex) {
                    exception = ex;
                }

            }
        });
    }複製代碼

總結一下:

  • 只有在HttpClientBuilder手動設置後,纔會開啓清理過時與空閒鏈接
  • 手動設置後,會啓動一個線程死循環執行,每次執行sleep必定時間,調用HttpClientConnectionManager的清理方法清理過時與空閒鏈接。

7、本文總結

  • HTTP協議經過持久鏈接的方式,減輕了早期設計中的過多鏈接問題
  • 持久鏈接有兩種方式:HTTP/1.0+的Keep-Avlive與HTTP/1.1的默認持久鏈接
  • HttpClient經過鏈接池來管理持久鏈接,鏈接池分爲兩個,一個是總鏈接池,一個是每一個route對應的鏈接池
  • HttpClient經過異步的Future<CPoolEntry>來獲取一個池化的鏈接
  • 默認鏈接重用策略與HTTP協議約束一致,根據response先判斷Connection:Close則關閉,在判斷Connection:Keep-Alive則開啓,最後版本大於1.0則開啓
  • 只有在HttpClientBuilder中手動開啓了清理過時與空閒鏈接的開關後,纔會清理鏈接池中的鏈接
  • HttpClient4.4以後的版本經過一個死循環線程清理過時與空閒鏈接,該線程每次執行都sleep一會,以達到按期執行的效果

上面的研究是基於HttpClient源碼的我的理解,若是有誤,但願你們積極留言討論。


文章來源:www.liangsonghua.me

關注微信公衆號:松花皮蛋的黑板報,獲取更多精彩!

公衆號介紹:分享在京東工做的技術感悟,還有JAVA技術和業內最佳實踐,大部分都是務實的、能看懂的、可復現的

相關文章
相關標籤/搜索