Http請求鏈接池-HttpClient的AbstractConnPool源碼分析

背景

在作服務化拆分的時候,若不是性能要求特別高的場景,咱們通常對外暴露Http服務。Spring裏提供了一個模板類RestTemplate,經過配置RestTemplate,咱們能夠快速地訪問外部的Http服務。Http底層是經過Tcp的三次握手創建鏈接的,若每一個請求都要從新創建鏈接,那開銷是很大的,特別是對於消息體很是小的場景,開銷更大。segmentfault

若使用鏈接池的方式,來管理鏈接對象,能極大地提升服務的吞吐量。設計模式

RestTemplate底層是封裝了HttpClient(筆者的版本是4.3.6),它提供了鏈接池機制來處理高併發網絡請求。安全

示例

一般,咱們採用以下的樣板代碼來構建HttpClient:網絡

HttpClientBuilder builder = HttpClientBuilder.create();
    builder.setMaxConnTotal(maxConnections).setMaxConnPerRoute(maxConnectionsPerRoute);
    if (!connectionReuse) {
        builder.setConnectionReuseStrategy(NoConnectionReuseStrategy.INSTANCE);
    }
    if (!automaticRetry) {
        builder.disableAutomaticRetries();
    }
    if (!compress) {
        builder.disableContentCompression();
    }
    HttpClient httpClient = builder.build();

從上面的代碼能夠看出,HttpClient使用建造者設計模式來構造對象,最後一行代碼構建對象,前面的代碼是用來設置客戶端的最大鏈接數、單路由最大鏈接數、是否使用長鏈接、壓縮等特性。併發

源碼分析

咱們進入HttpClientBuilder的build()方法,會看到以下代碼:ide

# 構造Http鏈接池管理器
    final PoolingHttpClientConnectionManager poolingmgr = new PoolingHttpClientConnectionManager(
            RegistryBuilder.<ConnectionSocketFactory>create()
                .register("http", PlainConnectionSocketFactory.getSocketFactory())
                .register("https", sslSocketFactory)
                .build());
    if (defaultSocketConfig != null) {
        poolingmgr.setDefaultSocketConfig(defaultSocketConfig);
    }
    if (defaultConnectionConfig != null) {
        poolingmgr.setDefaultConnectionConfig(defaultConnectionConfig);
    }
    if (systemProperties) {
        String s = System.getProperty("http.keepAlive", "true");
        if ("true".equalsIgnoreCase(s)) {
            s = System.getProperty("http.maxConnections", "5");
            final int max = Integer.parseInt(s);
            poolingmgr.setDefaultMaxPerRoute(max);
            poolingmgr.setMaxTotal(2 * max);
        }
    }
    if (maxConnTotal > 0) {
        poolingmgr.setMaxTotal(maxConnTotal);
    }
    if (maxConnPerRoute > 0) {
        poolingmgr.setDefaultMaxPerRoute(maxConnPerRoute);
    }
    # Http鏈接管理器採用鏈接池的方式實現
    connManager = poolingmgr;

默認狀況下構造出的Http鏈接管理器是採用鏈接池的方式實現的。函數

咱們進入 PoolingHttpClientConnectionManager的代碼,其鏈接池的核心實現是依賴於 CPool類,而 CPool又繼承了抽象類AbstractConnPool AbstractConnPool@ThreadSafe的註解,說明它是線程安全類,因此 HttpClient線程安全地獲取、釋放鏈接都依賴於 AbstractConnPool高併發

接下來我來看最核心的AbstractConnPool類,如下是鏈接池的結構圖:oop

alt text

鏈接池最重要的兩個公有方法是 leaserelease,即獲取鏈接和釋放鏈接的兩個方法。源碼分析

lease 獲取鏈接

@Override
    public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {
        Args.notNull(route, "Route");
        Asserts.check(!this.isShutDown, "Connection pool shut down");
        return new PoolEntryFuture<E>(this.lock, callback) {

            @Override
            public E getPoolEntry(
                    final long timeout,
                    final TimeUnit tunit)
                        throws InterruptedException, TimeoutException, IOException {
                final E entry = getPoolEntryBlocking(route, state, timeout, tunit, this);
                onLease(entry);
                return entry;
            }

        };
    }

lease方法返回的是一個 Future對象,即須要調用 Futureget方法,才能夠獲得PoolEntry的對象,它包含了一個鏈接的具體信息。

而獲取鏈接是經過 getPoolEntryBlocking方法實現的,經過函數名能夠知道,這是一個阻塞的方法,即該route所對應的鏈接池中的鏈接不夠用時,該方法就會阻塞,直到該 route所對應的鏈接池有鏈接釋放,方法纔會被喚醒;或者方法一直等待,直到鏈接超時拋出異常。

private E getPoolEntryBlocking(
            final T route, final Object state,
            final long timeout, final TimeUnit tunit,
            final PoolEntryFuture<E> future)
                throws IOException, InterruptedException, TimeoutException {

        Date deadline = null;
        // 設置鏈接超時時間戳
        if (timeout > 0) {
            deadline = new Date
                (System.currentTimeMillis() + tunit.toMillis(timeout));
        }
        // 獲取鏈接,並修改修改鏈接池,因此加鎖--->線程安全
        this.lock.lock();
        try {
            // 從Map中獲取該route對應的鏈接池,若Map中沒有,則建立該route對應的鏈接池
            final RouteSpecificPool<T, C, E> pool = getPool(route);
            E entry = null;
            while (entry == null) {
                Asserts.check(!this.isShutDown, "Connection pool shut down");
                for (;;) {
                    // 獲取 同一狀態的 空閒鏈接,即從available鏈表的頭部中移除,添加到leased集合中
                    entry = pool.getFree(state);
                    // 若返回鏈接爲空,跳出循環
                    if (entry == null) {
                        break;
                    }
                    // 若鏈接已過時,則關閉鏈接
                    if (entry.isExpired(System.currentTimeMillis())) {
                        entry.close();
                    } else if (this.validateAfterInactivity > 0) {
                        if (entry.getUpdated() + this.validateAfterInactivity <= System.currentTimeMillis()) {
                            if (!validate(entry)) {
                                entry.close();
                            }
                        }
                    }
                    if (entry.isClosed()) {
                        // 若該鏈接已關閉,則總的available鏈表中刪除該鏈接
                        this.available.remove(entry);
                        // 從該route對應的鏈接池的leased集合中刪除該鏈接,而且不回收到available鏈表中                        
                        pool.free(entry, false);
                    } else {
                        break;
                    }
                }
                // 跳出for循環
                if (entry != null) {
                    // 若獲取的鏈接不爲空,將鏈接從總的available鏈表移除,並添加到leased集合中
                    // 獲取鏈接成功,直接返回
                    this.available.remove(entry);
                    this.leased.add(entry);
                    onReuse(entry);
                    return entry;
                }
                // 計算該route的最大鏈接數
                // New connection is needed
                final int maxPerRoute = getMax(route);
                // Shrink the pool prior to allocating a new connection
                  // 計算該route鏈接池中的鏈接數 是否 大於等於 route最大鏈接數
                final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
                // 若大於等於 route最大鏈接數,則收縮該route的鏈接池
                if (excess > 0) {
                    for (int i = 0; i < excess; i++) {
                        // 獲取該route鏈接池中最不經常使用的空閒鏈接,即available鏈表末尾的鏈接
                        // 由於回收鏈接時,老是將鏈接添加到available鏈表的頭部,因此鏈表尾部的鏈接是最有可能過時的
                        final E lastUsed = pool.getLastUsed();
                        if (lastUsed == null) {
                            break;
                        }
                        // 關閉鏈接,並從總的空閒鏈表以及route對應的鏈接池中刪除
                        lastUsed.close();
                        this.available.remove(lastUsed);
                        pool.remove(lastUsed);
                    }
                }
                // 該route的鏈接池大小 小於 route最大鏈接數
                if (pool.getAllocatedCount() < maxPerRoute) {
                    final int totalUsed = this.leased.size();
                    final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
                    if (freeCapacity > 0) {
                        final int totalAvailable = this.available.size();
                        // 總的空閒鏈接數 大於等於 總的鏈接池剩餘容量
                        if (totalAvailable > freeCapacity - 1) {
                            if (!this.available.isEmpty()) {
                                // 從總的available鏈表中 以及 route對應的鏈接池中 刪除鏈接,並關閉鏈接
                                final E lastUsed = this.available.removeLast();
                                lastUsed.close();
                                final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
                                otherpool.remove(lastUsed);
                            }
                        }
                        // 建立新鏈接,並添加到總的leased集合以及route鏈接池的leased集合中,函數返回
                        final C conn = this.connFactory.create(route);
                        entry = pool.add(conn);
                        this.leased.add(entry);
                        return entry;
                    }
                }
                
                //route的鏈接池已滿,沒法分配鏈接
                boolean success = false;
                try {
                    // 將該獲取鏈接的任務放入pending隊列
                    pool.queue(future);
                    this.pending.add(future);
                    // 阻塞等待,若在超時以前被喚醒,則返回true;若直到超時才返回,則返回false
                    success = future.await(deadline);
                } finally {
                    // In case of 'success', we were woken up by the
                    // connection pool and should now have a connection
                    // waiting for us, or else we're shutting down.
                    // Just continue in the loop, both cases are checked.
                    // 不管是 被喚醒返回、超時返回 仍是被 中斷異常返回,都會進入finally代碼段
                    // 從pending隊列中移除
                    pool.unqueue(future);
                    this.pending.remove(future);
                }
                // check for spurious wakeup vs. timeout
                // 判斷是僞喚醒 仍是 鏈接超時
                // 如果 鏈接超時,則跳出while循環,並拋出 鏈接超時的異常;
                // 如果 僞喚醒,則繼續循環獲取鏈接
                if (!success && (deadline != null) &&
                    (deadline.getTime() <= System.currentTimeMillis())) {
                    break;
                }
            }
            throw new TimeoutException("Timeout waiting for connection");
        } finally {
            // 釋放鎖
            this.lock.unlock();
        }
    }

release 釋放鏈接

@Override
    public void release(final E entry, final boolean reusable) {
        // 獲取鎖
        this.lock.lock();
        try {
            // 從總的leased集合中移除鏈接
            if (this.leased.remove(entry)) {
                final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
                // 回收鏈接
                pool.free(entry, reusable);
                if (reusable && !this.isShutDown) {
                    this.available.addFirst(entry);
                    onRelease(entry);
                } else {
                    entry.close();
                }
                // 獲取pending隊列隊頭的任務(先進先出原則),喚醒該阻塞的任務
                PoolEntryFuture<E> future = pool.nextPending();
                if (future != null) {
                    this.pending.remove(future);
                } else {
                    future = this.pending.poll();
                }
                if (future != null) {
                    future.wakeup();
                }
            }
        } finally {
            // 釋放鎖
            this.lock.unlock();
        }
    }

總結

AbstractConnPool其實就是經過在獲取鏈接、釋放鏈接時加鎖,來實現線程安全,思路很是簡單,但它沒有在route對應的鏈接池中加鎖對象,即 RouteSpecificPool的獲取鏈接、釋放鏈接操做是不加鎖的,由於已經在 AbstractConnPool的外部調用中加鎖,因此是線程安全的,簡化了設計。

另外每一個route對應一個鏈接池,實現了在host級別的隔離,若下游的某臺提供服務的主機掛了,無效的鏈接最多隻佔用該route對應的鏈接池,不會佔用整個鏈接池,從而拖垮整個服務。

以上。

原文連接

https://segmentfault.com/a/11...

相關文章
相關標籤/搜索