Andriod 網絡框架 OkHttp 源碼解析

一、OkHttp 的基本使用

OkHttp 是 Square 的一款應用於 Android 和 Java 的 Http 和 Http/2 客戶端。使用的時候只須要在 Gradle 裏面加入下面一行依賴便可引入:java

implementation 'com.squareup.okhttp3:okhttp:3.11.0'
複製代碼

咱們知道,Http 請求有多種類型,經常使用的分爲 Get 和 Post,而 POST 又分爲 Form 和 Multiple 等。下面咱們以 Form 類型的請求爲例來看下 OkHttp 的 API 設計邏輯:web

OkHttpClient internalHttpClient = new OkHttpClient();
FormBody.Builder formBodyBuilder = new FormBody.Builder();
RequestBody body = formBodyBuilder.build();
Request.Builder builder = new Request.Builder().url("host:port/url").post(body);
Request request = builder.build();
Response response = internalHttpClient.newCall(request).execute();
String retJson = response.body().string();
複製代碼

這裏咱們先用了 FormBody 的構建者模式建立 Form 類型請求的請求體,而後使用 Request 的構建者建立完整的 Form 請求。以後,咱們用建立好的 OkHttp 客戶端 internalHttpClient 來獲取一個請求,並從請求的請求體中獲取 Json 數據。面試

根據 OkHttp 的 API,若是咱們但願發送一個 Multipart 類型的請求的時候就須要使用 MultipartBody 的構建者建立 Multipart 請求的請求體。而後一樣使用 Request 的構建者建立完整的 Multipart 請求,剩下的邏輯相同。算法

除了使用上面的直接實例化一個 OkHttp 客戶端的方式,咱們也可使用 OkHttpClient 的構建者 OkHttpClient.Builder 來建立 OkHttp 客戶端。設計模式

因此,咱們能夠總結:緩存

  1. OkHttp 爲不一樣的請求類型都提供了一個構建者方法用來建立請求體 RequestBody
  2. 由於請求體只是整個請求的一部分,因此,又要用 Request.Builder 構建一個請求對象 Request
  3. 這樣咱們獲得了一個完整的 Http 請求,而後使用 OkHttpClient 對象進行網絡訪問獲得響應對象 Response

OkHttp 自己的設計比較友好,思路很是清晰,按照上面的思路搞懂了人家的 API 設計邏輯,本身再基於 OkHttp 封裝一個庫天然問題不大。服務器

二、OkHttp 源碼分析

上面咱們提到的一些是基礎的 API 類,是提供給用戶使用的。這些類的設計只是基於構建者模式,很是容易理解。這裏咱們關注點也不在這些 API 類上面,而是 OkHttp 內部的請求執行相關的類。下面咱們就開始對 OkHttp 的請求過程進行源碼分析(源碼版本:3.10.0)。cookie

2.1 一個請求的大體流程

參考以前的示例程序,拋棄構建請求的過程不講,單從請求的發送過程來看,咱們的線索應該從 OkHttpClient.newCall(Request) 開始。下面是這個方法的定義,它會建立一個 RealCall 對象,並把 OkHttpClient 對象和 Request 對象做爲參數傳入進去:網絡

@Override public Call newCall(Request request) {
    return RealCall.newRealCall(this, request, false /* for web socket */);
}
複製代碼

而後,RealCall 調用內部的靜態方法 newRealCall 在其中建立一個 RealCall 實例並將其返回:併發

static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
    RealCall call = new RealCall(client, originalRequest, forWebSocket);
    call.eventListener = client.eventListenerFactory().create(call);
    return call;
}
複製代碼

而後,當返回了 RealCall 以後,咱們又會調用它的 execute() 方法來獲取響應結果,下面是這個方法的定義:

@Override public Response execute() throws IOException {
        synchronized (this) {
            if (executed) throw new IllegalStateException("Already Executed");
            executed = true;
        }
        captureCallStackTrace();
        eventListener.callStart(this);
        try {
            // 加入到一個雙端隊列中
            client.dispatcher().executed(this);
            // 從這裏拿的響應Response
            Response result = getResponseWithInterceptorChain();
            if (result == null) throw new IOException("Canceled");
            return result;
        } catch (IOException e) {
            eventListener.callFailed(this, e);
            throw e;
        } finally {
            client.dispatcher().finished(this);
        }
    }
複製代碼

這裏咱們會用 client 對象(實際也就是上面建立 RealCall 的時候傳入的 OkHttpClient)的 dispatcher() 方法來獲取一個 Dispatcher 對象,並調用它的 executed() 方法來將當前的 RealCall 加入到一個雙端隊列中,下面是 executed(RealCall) 方法的定義,這裏的 runningSyncCalls 的類型是 Deque<RealCall>

synchronized void executed(RealCall call) {
        runningSyncCalls.add(call);
    }
複製代碼

讓咱們回到上面的 execute() 方法,在把 RealCall 加入到雙端隊列以後,咱們又調用了 getResponseWithInterceptorChain() 方法,下面就是該方法的定義。

Response getResponseWithInterceptorChain() throws IOException {
        // 添加一系列攔截器,注意添加的順序
        List<Interceptor> interceptors = new ArrayList<>();
        interceptors.addAll(client.interceptors());
        interceptors.add(retryAndFollowUpInterceptor);
        // 橋攔截器
        interceptors.add(new BridgeInterceptor(client.cookieJar()));
        // 緩存攔截器:從緩存中拿數據
        interceptors.add(new CacheInterceptor(client.internalCache()));
        // 網絡鏈接攔截器:創建網絡鏈接
        interceptors.add(new ConnectInterceptor(client));
        if (!forWebSocket) {
            interceptors.addAll(client.networkInterceptors());
        }
        // 服務器請求攔截器:向服務器發起請求獲取數據
        interceptors.add(new CallServerInterceptor(forWebSocket));
        // 構建一條責任鏈
        Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
            originalRequest, this, eventListener, client.connectTimeoutMillis(),
            client.readTimeoutMillis(), client.writeTimeoutMillis());
        // 處理責任鏈
        return chain.proceed(originalRequest);
    }
複製代碼

這裏,咱們建立了一個列表對象以後把 client 中的攔截器、重連攔截器、橋攔截器、緩存攔截器、網絡鏈接攔截器和服務器請求攔截器等依次加入到列表中。而後,咱們用這個列表建立了一個攔截器鏈。這裏使用了責任鏈設計模式,每當一個攔截器執行完畢以後會調用下一個攔截器或者不調用並返回結果。顯然,咱們最終拿到的響應就是這個鏈條執行以後返回的結果。當咱們自定義一個攔截器的時候,也會被加入到這個攔截器鏈條裏。

這裏咱們遇到了不少的新類,好比 RealCallDispatcher 以及責任鏈等。下文中,咱們會對這些類之間的關係以及責任鏈中的環節作一個分析,而這裏咱們先對整個請求的流程作一個大體的梳理。下面是這個過程大體的時序圖:

OkHttp請求時序圖

2.2 分發器 Dispatcher

上面咱們提到了 Dispatcher 這個類,它的做用是對請求進行分發。以最開始的示例代碼爲例,在使用 OkHttp 的時候,咱們會建立一個 RealCall 並將其加入到雙端隊列中。可是請注意這裏的雙端隊列的名稱是 runningSyncCalls,也就是說這種請求是同步請求,會在當前的線程中當即被執行。因此,下面的 getResponseWithInterceptorChain() 就是這個同步的執行過程。而當咱們執行完畢的時候,又會調用 Dispatcherfinished(RealCall) 方法把該請求從隊列中移除。因此,這種同步的請求沒法體現分發器的「分發」功能。

除了同步的請求,還有異步類型的請求:當咱們拿到了 RealCall 的時候,調用它的 enqueue(Callback responseCallback) 方法並設置一個回調便可。該方法會執行下面這行代碼:

client.dispatcher().enqueue(new AsyncCall(responseCallback));
複製代碼

即便用上面的回調建立一個 AsyncCall 並調用 enqueue(AsyncCall)。這裏的 AsyncCall 間接繼承自 Runnable,是一個可執行的對象,而且會在 Runnablerun() 方法裏面調用 AsyncCallexecute() 方法。AsyncCallexecute() 方法與 RealCallexecute() 方法相似,都使用責任鏈來完成一個網絡請求。只是後者能夠放在一個異步的線程中進行執行。

當咱們調用了 Dispatcherenqueue(AsyncCall) 方法的時候也會將 AsyncCall 加入到一個隊列中,並會在請求執行完畢的時候從該隊列中移除,只是這裏的隊列是 runningAsyncCalls 或者 readyAsyncCalls。它們都是一個雙端隊列,並用來存儲異步類型的請求。它們的區別是,runningAsyncCalls 是正在執行的隊列,當正在執行的隊列達到了限制的時候,就會將其放置到就緒隊列 readyAsyncCalls 中:

synchronized void enqueue(AsyncCall call) {
        if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
            runningAsyncCalls.add(call);
            executorService().execute(call);
        } else {
            readyAsyncCalls.add(call);
        }
    }
複製代碼

當把該請求加入到了正在執行的隊列以後,咱們會當即使用一個線程池來執行該 AsyncCall。這樣這個請求的責任鏈就會在一個線程池當中被異步地執行了。這裏的線程池由 executorService() 方法返回:

public synchronized ExecutorService executorService() {
        if (executorService == null) {
            executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
            new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
        }
        return executorService;
    }
複製代碼

顯然,當線程池不存在的時候會去建立一個線程池。除了上面的這種方式,咱們還能夠在構建 OkHttpClient 的時候,自定義一個 Dispacher,並在其構造方法中爲其指定一個線程池。下面咱們類比 OkHttp 的同步請求繪製了一個異步請求的時序圖。你能夠經過將兩個圖對比來了解兩種實現方式的不一樣:

OkHttp異步請求

以上就是分發器 Dispacher 的邏輯,看上去並無那麼複雜。而且從上面的分析中,咱們能夠看出實際請求的執行過程並非在這裏完成的,這裏只能決定在哪一個線程當中執行請求並把請求用雙端隊列緩存下來,而實際的請求執行過程是在責任鏈中完成的。下面咱們就來分析一下 OkHttp 裏的責任鏈的執行過程。

2.3 責任鏈的執行過程

在典型的責任鏈設計模式裏,不少對象由每個對象對其下級的引用而鏈接起來造成一條鏈。請求在這個鏈上傳遞,直到鏈上的某一個對象決定處理此請求。發出這個請求的客戶端並不知道鏈上的哪個對象最終處理這個請求,這使得系統能夠在不影響客戶端的狀況下動態地從新組織和分配責任。責任鏈在現實生活中的一種場景就是面試,當某輪面試官以爲你沒有資格進入下一輪的時候能夠否認你,否則會讓下一輪的面試官繼續面試。

在 OkHttp 裏面,責任鏈的執行模式與之稍有不一樣。這裏咱們主要來分析一下在 OkHttp 裏面,責任鏈是如何執行的,至於每一個鏈條裏面的具體邏輯,咱們會在隨後一一說明。

回到 2.1 的代碼,有兩個地方須要咱們注意:

  1. 是當建立一個責任鏈 RealInterceptorChain 的時候,咱們傳入的第 5 個參數是 0。該參數名爲 index,會被賦值給 RealInterceptorChain 實例內部的同名全局變量。
  2. 當啓用責任鏈的時候,會調用它的 proceed(Request) 方法。

下面是 proceed(Request) 方法的定義:

@Override public Response proceed(Request request) throws IOException {
        return proceed(request, streamAllocation, httpCodec, connection);
    }
複製代碼

這裏又調用了內部的重載的 proceed() 方法。下面咱們對該方法進行了簡化:

public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec, RealConnection connection) throws IOException {
        if (index >= interceptors.size()) throw new AssertionError();
        // ...
        // 調用責任鏈的下一個攔截器
        RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
            connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
            writeTimeout);
        Interceptor interceptor = interceptors.get(index);
        Response response = interceptor.intercept(next);
        // ...
        return response;
    }
複製代碼

注意到這裏使用責任鏈進行處理的時候,會新建下一個責任鏈並把 index+1 做爲下一個責任鏈的 index。而後,咱們使用 index 從攔截器列表中取出一個攔截器,調用它的 intercept() 方法,並把下一個執行鏈做爲參數傳遞進去。

這樣,當下一個攔截器但願本身的下一級繼續處理這個請求的時候,能夠調用傳入的責任鏈的 proceed() 方法;若是本身處理完畢以後,下一級不須要繼續處理,那麼就直接返回一個 Response 實例便可。由於,每次都是在當前的 index 基礎上面加 1,因此能在調用 proceed() 的時候準確地從攔截器列表中取出下一個攔截器進行處理。

咱們還要注意的地方是以前提到太重試攔截器,這種攔截器會在內部啓動一個 while 循環,並在循環體中調用執行鏈的 proceed() 方法來實現請求的不斷重試。這是由於在它那裏的攔截器鏈的 index 是固定的,因此可以每次調用 proceed() 的時候,都可以從本身的下一級執行一遍鏈條。下面就是這個責任鏈的執行過程:

責任鏈執行過程

清楚了 OkHttp 的攔截器鏈的執行過程以後,咱們來看一下各個攔截器作了什麼邏輯。

2.3 重試和重定向:RetryAndFollowUpInterceptor

RetryAndFollowUpInterceptor 主要用來當請求失敗的時候進行重試,以及在須要的狀況下進行重定向。咱們上面說,責任鏈會在進行處理的時候調用第一個攔截器的 intercept() 方法。若是咱們在建立 OkHttp 客戶端的時候沒有加入自定義攔截器,那麼 RetryAndFollowUpInterceptor 就是咱們的責任鏈中最早被調用的攔截器。

@Override public Response intercept(Chain chain) throws IOException {
        // ...
        // 注意這裏咱們初始化了一個 StreamAllocation 並賦值給全局變量,它的做用咱們後面會提到
        StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
                createAddress(request.url()), call, eventListener, callStackTrace);
        this.streamAllocation = streamAllocation;
        // 用來記錄重定向的次數
        int followUpCount = 0;
        Response priorResponse = null;
        while (true) {
            if (canceled) {
                streamAllocation.release();
                throw new IOException("Canceled");
            }

            Response response;
            boolean releaseConnection = true;
            try {
                // 這裏從當前的責任鏈開始執行一遍責任鏈,是一種重試的邏輯
                response = realChain.proceed(request, streamAllocation, null, null);
                releaseConnection = false;
            } catch (RouteException e) {
                // 調用 recover 方法從失敗中進行恢復,若是能夠恢復就返回true,不然返回false
                if (!recover(e.getLastConnectException(), streamAllocation, false, request)) {
                    throw e.getLastConnectException();
                }
                releaseConnection = false;
                continue;
            } catch (IOException e) {
                // 重試與服務器進行鏈接
                boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
                if (!recover(e, streamAllocation, requestSendStarted, request)) throw e;
                releaseConnection = false;
                continue;
            } finally {
                // 若是 releaseConnection 爲 true 則代表中間出現了異常,須要釋放資源
                if (releaseConnection) {
                    streamAllocation.streamFailed(null);
                    streamAllocation.release();
                }
            }

            // 使用以前的響應 priorResponse 構建一個響應,這種響應的響應體 body 爲空
            if (priorResponse != null) {
                response = response.newBuilder()
                        .priorResponse(priorResponse.newBuilder().body(null).build())
                        .build();
            }

            // 根據獲得的響應進行處理,可能會增長一些認證信息、重定向或者處理超時請求
            // 若是該請求沒法繼續被處理或者出現的錯誤不須要繼續處理,將會返回 null
            Request followUp = followUpRequest(response, streamAllocation.route());

            // 沒法重定向,直接返回以前的響應
            if (followUp == null) {
                if (!forWebSocket) {
                    streamAllocation.release();
                }
                return response;
            }

            // 關閉資源
            closeQuietly(response.body());

            // 達到了重定向的最大次數,就拋出一個異常
            if (++followUpCount > MAX_FOLLOW_UPS) {
                streamAllocation.release();
                throw new ProtocolException("Too many follow-up requests: " + followUpCount);
            }

            if (followUp.body() instanceof UnrepeatableRequestBody) {
                streamAllocation.release();
                throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());
            }

            // 這裏判斷新的請求是否可以複用以前的鏈接,若是沒法複用,則建立一個新的鏈接
            if (!sameConnection(response, followUp.url())) {
                streamAllocation.release();
                streamAllocation = new StreamAllocation(client.connectionPool(),
                        createAddress(followUp.url()), call, eventListener, callStackTrace);
                this.streamAllocation = streamAllocation;
            } else if (streamAllocation.codec() != null) {
                throw new IllegalStateException("Closing the body of " + response
                        + " didn't close its backing stream. Bad interceptor?");
            }

            request = followUp;
            priorResponse = response;
        }
    }
複製代碼

以上的代碼主要用來根據錯誤的信息作一些處理,會根據服務器返回的信息判斷這個請求是否能夠重定向,或者是否有必要進行重試。若是值得去重試就會新建或者複用以前的鏈接在下一次循環中進行請求重試,不然就將獲得的請求包裝以後返回給用戶。這裏,咱們提到了 StreamAllocation 對象,它至關於一個管理類,維護了服務器鏈接、併發流和請求之間的關係,該類還會初始化一個 Socket 鏈接對象,獲取輸入/輸出流對象。同時,還要注意這裏咱們經過 client.connectionPool() 傳入了一個鏈接池對象 ConnectionPool。這裏咱們只是初始化了這些類,但實際在當前的方法中並無真正用到這些類,而是把它們傳遞到下面的攔截器裏來從服務器中獲取請求的響應。稍後,咱們會說明這些類的用途,以及之間的關係。

2.4 BridgeInterceptor

橋攔截器 BridgeInterceptor 用於從用戶的請求中構建網絡請求,而後使用該請求訪問網絡,最後從網絡響應當中構建用戶響應。相對來講這個攔截器的邏輯比較簡單,只是用來對請求進行包裝,並將服務器響應轉換成用戶友好的響應:

public final class BridgeInterceptor implements Interceptor {
        @Override public Response intercept(Chain chain) throws IOException {
            Request userRequest = chain.request();
            // 從用戶請求中獲取網絡請求構建者
            Request.Builder requestBuilder = userRequest.newBuilder();
            // ...
            // 執行網絡請求
            Response networkResponse = chain.proceed(requestBuilder.build());
            // ...
            // 從網絡響應中獲取用戶響應構建者
            Response.Builder responseBuilder = networkResponse.newBuilder().request(userRequest);
            // ...
            // 返回用戶響應
            return responseBuilder.build();
        }
    }
複製代碼

2.5 使用緩存:CacheInterceptor

緩存攔截器會根據請求的信息和緩存的響應的信息來判斷是否存在緩存可用,若是有可使用的緩存,那麼就返回該緩存該用戶,不然就繼續責任鏈來從服務器中獲取響應。當獲取到響應的時候,又會把響應緩存到磁盤上面。如下是這部分的邏輯:

public final class CacheInterceptor implements Interceptor {
        @Override public Response intercept(Chain chain) throws IOException {
            Response cacheCandidate = cache != null ? cache.get(chain.request()) : null;
            long now = System.currentTimeMillis();
            // 根據請求和緩存的響應中的信息來判斷是否存在緩存可用
            CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
            Request networkRequest = strategy.networkRequest; // 若是該請求沒有使用網絡就爲空
            Response cacheResponse = strategy.cacheResponse; // 若是該請求沒有使用緩存就爲空
            if (cache != null) {
                cache.trackResponse(strategy);
            }
            if (cacheCandidate != null && cacheResponse == null) {
                closeQuietly(cacheCandidate.body());
            }
            // 請求不使用網絡而且不使用緩存,至關於在這裏就攔截了,不必交給下一級(網絡請求攔截器)來執行
            if (networkRequest == null && cacheResponse == null) {
                return new Response.Builder()
                        .request(chain.request())
                        .protocol(Protocol.HTTP_1_1)
                        .code(504)
                        .message("Unsatisfiable Request (only-if-cached)")
                        .body(Util.EMPTY_RESPONSE)
                        .sentRequestAtMillis(-1L)
                        .receivedResponseAtMillis(System.currentTimeMillis())
                        .build();
            }
            // 該請求使用緩存,可是不使用網絡:從緩存中拿結果,不必交給下一級(網絡請求攔截器)執行
            if (networkRequest == null) {
                return cacheResponse.newBuilder().cacheResponse(stripBody(cacheResponse)).build();
            }
            Response networkResponse = null;
            try {
                // 這裏調用了執行鏈的處理方法,實際就是交給本身的下一級來執行了
                networkResponse = chain.proceed(networkRequest);
            } finally {
                if (networkResponse == null && cacheCandidate != null) {
                    closeQuietly(cacheCandidate.body());
                }
            }
            // 這裏當拿到了網絡請求以後調用,下一級執行完畢會交給它繼續執行,若是使用了緩存就把請求結果更新到緩存裏
            if (cacheResponse != null) {
                // 服務器返回的結果是304,返回緩存中的結果
                if (networkResponse.code() == HTTP_NOT_MODIFIED) {
                    Response response = cacheResponse.newBuilder()
                            .headers(combine(cacheResponse.headers(), networkResponse.headers()))
                            .sentRequestAtMillis(networkResponse.sentRequestAtMillis())
                            .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
                            .cacheResponse(stripBody(cacheResponse))
                            .networkResponse(stripBody(networkResponse))
                            .build();
                    networkResponse.body().close();
                    cache.trackConditionalCacheHit();
                    // 更新緩存
                    cache.update(cacheResponse, response);
                    return response;
                } else {
                    closeQuietly(cacheResponse.body());
                }
            }
            Response response = networkResponse.newBuilder()
                    .cacheResponse(stripBody(cacheResponse))
                    .networkResponse(stripBody(networkResponse))
                    .build();
            // 把請求的結果放進緩存裏
            if (cache != null) {
                if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
                    CacheRequest cacheRequest = cache.put(response);
                    return cacheWritingResponse(cacheRequest, response);
                }
                if (HttpMethod.invalidatesCache(networkRequest.method())) {
                    try {
                        cache.remove(networkRequest);
                    } catch (IOException ignored) {
                        // The cache cannot be written.
                    }
                }
            }
            return response;
        }
    }
複製代碼

對緩存,這裏咱們使用的是全局變量 cache,它是 InternalCache 類型的變量。InternalCache 是一個接口,在 OkHttp 中只有一個實現類 Cache。在 Cache 內部,使用了 DiskLruCache 來將緩存的數據存到磁盤上。DiskLruCache 以及 LruCache 是 Android 上經常使用的兩種緩存策略。前者是基於磁盤來進行緩存的,後者是基於內存來進行緩存的,它們的核心思想都是 Least Recently Used,即最近最少使用算法。咱們會在之後的文章中詳細介紹這兩種緩存框架,也請繼續關注咱們的文章。

另外,上面咱們根據請求和緩存的響應中的信息來判斷是否存在緩存可用的時候用到了 CacheStrategy 的兩個字段,獲得這兩個字段的時候使用了很是多的判斷,其中涉及 Http 緩存相關的知識,感興趣的話能夠本身參考源代碼。

2.6 鏈接複用:ConnectInterceptor

鏈接攔截器 ConnectInterceptor 用來打開到指定服務器的網絡鏈接,並交給下一個攔截器處理。這裏咱們只打開了一個網絡鏈接,可是並無發送請求到服務器。從服務器獲取數據的邏輯交給下一級的攔截器來執行。雖然,這裏並無真正地從網絡中獲取數據,而僅僅是打開一個鏈接,但這裏有很多的內容值得咱們去關注。由於在獲取鏈接對象的時候,使用了鏈接池 ConnectionPool 來複用鏈接。

public final class ConnectInterceptor implements Interceptor {

        @Override public Response intercept(Chain chain) throws IOException {
            RealInterceptorChain realChain = (RealInterceptorChain) chain;
            Request request = realChain.request();
            StreamAllocation streamAllocation = realChain.streamAllocation();

            boolean doExtensiveHealthChecks = !request.method().equals("GET");
            HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
            RealConnection connection = streamAllocation.connection();

            return realChain.proceed(request, streamAllocation, httpCodec, connection);
        }
    }
複製代碼

這裏的 HttpCodec 用來編碼請求並解碼響應,RealConnection 用來向服務器發起鏈接。它們會在下一個攔截器中被用來從服務器中獲取響應信息。下一個攔截器的邏輯並不複雜,這裏萬事具有以後,只要它來從服務器中讀取數據便可。能夠說,OkHttp 中的核心部分大概就在這裏,因此,咱們就先好好分析一下,這裏在建立鏈接的時候如何藉助鏈接池來實現鏈接複用的。

根據上面的代碼,當咱們調用 streamAllocationnewStream() 方法的時候,最終會通過一系列的判斷到達 StreamAllocation 中的 findConnection() 方法。

private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
        // ...
        synchronized (connectionPool) {
            // ...
            // 嘗試使用已分配的鏈接,已經分配的鏈接可能已經被限制建立新的流
            releasedConnection = this.connection;
            // 釋放當前鏈接的資源,若是該鏈接已經被限制建立新的流,就返回一個Socket以關閉鏈接
            toClose = releaseIfNoNewStreams();
            if (this.connection != null) {
                // 已分配鏈接,而且該鏈接可用
                result = this.connection;
                releasedConnection = null;
            }
            if (!reportedAcquired) {
                // 若是該鏈接從未被標記爲得到,不要標記爲發佈狀態,reportedAcquired 經過 acquire() 方法修改
                releasedConnection = null;
            }

            if (result == null) {
                // 嘗試供鏈接池中獲取一個鏈接
                Internal.instance.get(connectionPool, address, this, null);
                if (connection != null) {
                    foundPooledConnection = true;
                    result = connection;
                } else {
                    selectedRoute = route;
                }
            }
        }
        // 關閉鏈接
        closeQuietly(toClose);

        if (releasedConnection != null) {
            eventListener.connectionReleased(call, releasedConnection);
        }
        if (foundPooledConnection) {
            eventListener.connectionAcquired(call, result);
        }
        if (result != null) {
            // 若是已經從鏈接池中獲取到了一個鏈接,就將其返回
            return result;
        }

        boolean newRouteSelection = false;
        if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
            newRouteSelection = true;
            routeSelection = routeSelector.next();
        }

        synchronized (connectionPool) {
            if (canceled) throw new IOException("Canceled");

            if (newRouteSelection) {
                // 根據一系列的 IP 地址從鏈接池中獲取一個連接
                List<Route> routes = routeSelection.getAll();
                for (int i = 0, size = routes.size(); i < size; i++) {
                    Route route = routes.get(i);
                    // 從鏈接池中獲取一個鏈接
                    Internal.instance.get(connectionPool, address, this, route);
                    if (connection != null) {
                        foundPooledConnection = true;
                        result = connection;
                        this.route = route;
                        break;
                    }
                }
            }

            if (!foundPooledConnection) {
                if (selectedRoute == null) {
                    selectedRoute = routeSelection.next();
                }

                // 建立一個新的鏈接,並將其分配,這樣咱們就能夠在握手以前進行終端
                route = selectedRoute;
                refusedStreamCount = 0;
                result = new RealConnection(connectionPool, selectedRoute);
                acquire(result, false);
            }
        }

        // 若是咱們在第二次的時候發現了一個池鏈接,那麼咱們就將其返回
        if (foundPooledConnection) {
            eventListener.connectionAcquired(call, result);
            return result;
        }

        // 進行 TCP 和 TLS 握手
        result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
                connectionRetryEnabled, call, eventListener);
        routeDatabase().connected(result.route());

        Socket socket = null;
        synchronized (connectionPool) {
            reportedAcquired = true;

            // 將該鏈接放進鏈接池中
            Internal.instance.put(connectionPool, result);

            // 若是同時建立了另外一個到同一地址的多路複用鏈接,釋放這個鏈接並獲取那個鏈接
            if (result.isMultiplexed()) {
                socket = Internal.instance.deduplicate(connectionPool, address, this);
                result = connection;
            }
        }
        closeQuietly(socket);

        eventListener.connectionAcquired(call, result);
        return result;
    }
複製代碼

該方法會被放置在一個循環當中被不停地調用以獲得一個可用的鏈接。它優先使用當前已經存在的鏈接,否則就使用鏈接池中存在的鏈接,再不行的話,就建立一個新的鏈接。因此,上面的代碼大體分紅三個部分:

  1. 判斷當前的鏈接是否可使用:流是否已經被關閉,而且已經被限制建立新的流;
  2. 若是當前的鏈接沒法使用,就從鏈接池中獲取一個鏈接;
  3. 鏈接池中也沒有發現可用的鏈接,建立一個新的鏈接,並進行握手,而後將其放到鏈接池中。

在從鏈接池中獲取一個鏈接的時候,使用了 Internalget() 方法。Internal 有一個靜態的實例,會在 OkHttpClient 的靜態代碼快中被初始化。咱們會在 Internalget() 中調用鏈接池的 get() 方法來獲得一個鏈接。

從上面的代碼中咱們也能夠看出,實際上,咱們使用鏈接複用的一個好處就是省去了進行 TCP 和 TLS 握手的一個過程。由於創建鏈接自己也是須要消耗一些時間的,鏈接被複用以後能夠提高咱們網絡訪問的效率。那麼這些鏈接被放置在鏈接池以後是如何進行管理的呢?咱們會在下文中分析 OkHttp 的 ConnectionPool 中是如何管理這些鏈接的。

2.7 CallServerInterceptor

服務器請求攔截器 CallServerInterceptor 用來向服務器發起請求並獲取數據。這是整個責任鏈的最後一個攔截器,這裏沒有再繼續調用執行鏈的處理方法,而是把拿到的響應處理以後直接返回給了上一級的攔截器:

public final class CallServerInterceptor implements Interceptor {

        @Override public Response intercept(Chain chain) throws IOException {
            RealInterceptorChain realChain = (RealInterceptorChain) chain;
            // 獲取 ConnectInterceptor 中初始化的 HttpCodec
            HttpCodec httpCodec = realChain.httpStream();
            // 獲取 RetryAndFollowUpInterceptor 中初始化的 StreamAllocation
            StreamAllocation streamAllocation = realChain.streamAllocation();
            // 獲取 ConnectInterceptor 中初始化的 RealConnection
            RealConnection connection = (RealConnection) realChain.connection();
            Request request = realChain.request();

            long sentRequestMillis = System.currentTimeMillis();

            realChain.eventListener().requestHeadersStart(realChain.call());
            // 在這裏寫入請求頭 
            httpCodec.writeRequestHeaders(request);
            realChain.eventListener().requestHeadersEnd(realChain.call(), request);

            Response.Builder responseBuilder = null;
            if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
                if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
                    httpCodec.flushRequest();
                    realChain.eventListener().responseHeadersStart(realChain.call());
                    responseBuilder = httpCodec.readResponseHeaders(true);
                }
                 // 在這裏寫入請求體
                if (responseBuilder == null) {
                    realChain.eventListener().requestBodyStart(realChain.call());
                    long contentLength = request.body().contentLength();
                    CountingSink requestBodyOut =
                            new CountingSink(httpCodec.createRequestBody(request, contentLength));
                    BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
                    // 寫入請求體
                    request.body().writeTo(bufferedRequestBody);
                    bufferedRequestBody.close();
                    realChain.eventListener()
                            .requestBodyEnd(realChain.call(), requestBodyOut.successfulCount);
                } else if (!connection.isMultiplexed()) {
                    streamAllocation.noNewStreams();
                }
            }
            httpCodec.finishRequest();
            if (responseBuilder == null) {
                realChain.eventListener().responseHeadersStart(realChain.call());
                // 讀取響應頭
                responseBuilder = httpCodec.readResponseHeaders(false);
            }
            Response response = responseBuilder
                    .request(request)
                    .handshake(streamAllocation.connection().handshake())
                    .sentRequestAtMillis(sentRequestMillis)
                    .receivedResponseAtMillis(System.currentTimeMillis())
                    .build();
            // 讀取響應體
            int code = response.code();
            if (code == 100) {
                responseBuilder = httpCodec.readResponseHeaders(false);
                response = responseBuilder
                        .request(request)
                        .handshake(streamAllocation.connection().handshake())
                        .sentRequestAtMillis(sentRequestMillis)
                        .receivedResponseAtMillis(System.currentTimeMillis())
                        .build();
                code = response.code();
            }
            realChain.eventListener().responseHeadersEnd(realChain.call(), response);
            if (forWebSocket && code == 101) {
                response = response.newBuilder()
                        .body(Util.EMPTY_RESPONSE)
                        .build();
            } else {
                response = response.newBuilder()
                        .body(httpCodec.openResponseBody(response))
                        .build();
            }
            // ...
            return response;
        }
    }
複製代碼

2.8 鏈接管理:ConnectionPool

與請求的緩存相似,OkHttp 的鏈接池也使用一個雙端隊列來緩存已經建立的鏈接:

private final Deque<RealConnection> connections = new ArrayDeque<>();
複製代碼

OkHttp 的緩存管理分紅兩個步驟,一邊當咱們建立了一個新的鏈接的時候,咱們要把它放進緩存裏面;另外一邊,咱們還要來對緩存進行清理。在 ConnectionPool 中,當咱們向鏈接池中緩存一個鏈接的時候,只要調用雙端隊列的 add() 方法,將其加入到雙端隊列便可,而清理鏈接緩存的操做則交給線程池來定時執行。

ConnectionPool 中存在一個靜態的線程池:

private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
        Integer.MAX_VALUE /* maximumPoolSize */, 
        60L /* keepAliveTime */,
        TimeUnit.SECONDS,
        new SynchronousQueue<Runnable>(), 
        Util.threadFactory("OkHttp ConnectionPool", true));
複製代碼

每當咱們向鏈接池中插入一個鏈接的時候就會調用下面的方法,將鏈接插入到雙端隊列的同時,會調用上面的線程池來執行清理緩存的任務:

void put(RealConnection connection) {
        assert (Thread.holdsLock(this));
        if (!cleanupRunning) {
            cleanupRunning = true;
            // 使用線程池執行清理任務
            executor.execute(cleanupRunnable);
        }
        // 將新建的鏈接插入到雙端隊列中
        connections.add(connection);
    }
複製代碼

這裏的清理任務是 cleanupRunnable,是一個 Runnable 類型的實例。它會在方法內部調用 cleanup() 方法來清理無效的鏈接:

private final Runnable cleanupRunnable = new Runnable() {
        @Override public void run() {
            while (true) {
                long waitNanos = cleanup(System.nanoTime());
                if (waitNanos == -1) return;
                if (waitNanos > 0) {
                    long waitMillis = waitNanos / 1000000L;
                    waitNanos -= (waitMillis * 1000000L);
                    synchronized (ConnectionPool.this) {
                        try {
                            ConnectionPool.this.wait(waitMillis, (int) waitNanos);
                        } catch (InterruptedException ignored) {
                        }
                    }
                }
            }
        }
    };
複製代碼

下面是 cleanup() 方法:

long cleanup(long now) {
        int inUseConnectionCount = 0;
        int idleConnectionCount = 0;
        RealConnection longestIdleConnection = null;
        long longestIdleDurationNs = Long.MIN_VALUE;

        synchronized (this) {
            // 遍歷全部的鏈接
            for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
                RealConnection connection = i.next();
                // 當前的鏈接正在使用中
                if (pruneAndGetAllocationCount(connection, now) > 0) {
                    inUseConnectionCount++;
                    continue;
                }
                idleConnectionCount++;
                // 若是找到了一個能夠被清理的鏈接,會嘗試去尋找閒置時間最久的鏈接來釋放
                long idleDurationNs = now - connection.idleAtNanos;
                if (idleDurationNs > longestIdleDurationNs) {
                    longestIdleDurationNs = idleDurationNs;
                    longestIdleConnection = connection;
                }
            }

            if (longestIdleDurationNs >= this.keepAliveDurationNs 
                    || idleConnectionCount > this.maxIdleConnections) {
                // 該鏈接的時長超出了最大的活躍時長或者閒置的鏈接數量超出了最大容許的範圍,直接移除
                connections.remove(longestIdleConnection);
            } else if (idleConnectionCount > 0) {
                // 閒置的鏈接的數量大於0,停頓指定的時間(等會兒會將其清理掉,如今還不是時候)
                return keepAliveDurationNs - longestIdleDurationNs;
            } else if (inUseConnectionCount > 0) {
                // 全部的鏈接都在使用中,5分鐘後再清理
                return keepAliveDurationNs;
            } else {
                // 沒有鏈接
                cleanupRunning = false;
                return -1;
            }
        }

        closeQuietly(longestIdleConnection.socket());
        return 0;
    }
複製代碼

在從緩存的鏈接中取出鏈接來判斷是否應該將其釋放的時候使用到了兩個變量 maxIdleConnectionskeepAliveDurationNs,分別表示最大容許的閒置的鏈接的數量和鏈接容許存活的最長的時間。默認空閒鏈接最大數目爲5個,keepalive 時間最長爲5分鐘。

上面的方法會對緩存中的鏈接進行遍歷,以尋找一個閒置時間最長的鏈接,而後根據該鏈接的閒置時長和最大容許的鏈接數量等參數來決定是否應該清理該鏈接。同時注意上面的方法的返回值是一個時間,若是閒置時間最長的鏈接仍然須要一段時間才能被清理的時候,會返回這段時間的時間差,而後會在這段時間以後再次對鏈接池進行清理。

總結:

以上就是咱們對 OkHttp 內部網絡訪問的源碼的分析。當咱們發起一個請求的時候會初始化一個 Call 的實例,而後根據同步和異步的不一樣,分別調用它的 execute()enqueue() 方法。雖然,兩個方法一個會在當前的線程中被當即執行,一個會在線程池當中執行,可是它們進行網絡訪問的邏輯都是同樣的:經過攔截器組成的責任鏈,依次通過重試、橋接、緩存、鏈接和訪問服務器等過程,來獲取到一個響應並交給用戶。其中,緩存和鏈接兩部份內容是重點,由於前者涉及到了一些計算機網絡方面的知識,後者則是 OkHttp 效率和框架的核心。

相關文章
相關標籤/搜索