Android 主流開源框架(三)OkHttp 源碼解析

文章首發於個人我的博客:wildma的博客,這裏有更好的閱讀體驗,歡迎關注。html

前言

最近有個想法——就是把 Android 主流開源框架進行深刻分析,而後寫成一系列文章,包括該框架的詳細使用與源碼解析。目的是經過鑑賞大神的源碼來了解框架底層的原理,也就是作到不只要知其然,還要知其因此然。java

這裏我說下本身閱讀源碼的經驗,我通常都是按照平時使用某個框架或者某個系統源碼的使用流程入手的,首先要知道怎麼使用,而後再去深究每一步底層作了什麼,用了哪些好的設計模式,爲何要這麼設計。git

系列文章:github

更多幹貨請關注 AndroidNotesweb

1、OkHttp 的基本使用示例

1.1 同步 GET 請求

// (1)建立 OkHttpClient 對象
OkHttpClient client = new OkHttpClient();
 // (2)建立 Request 對象
Request request = new Request.Builder()
        .url(url)
        .build();
// (3)建立 Call 對象。
Call call = client.newCall(request);
// (4)發送請求並獲取服務器返回的數據
Response response = call.execute();
 // (5)取出相應的數據
String data = response.body().string();
複製代碼

1.2 異步 GET 請求

// (1)建立 OkHttpClient 對象
OkHttpClient client = new OkHttpClient();
// (2)建立 Request 對象
Request request = new Request.Builder()
        .url(url)
        .build();
// (3)建立 Call 對象。
Call call = client.newCall(request);
// (4)發送請求並獲取服務器返回的數據
call.enqueue(new Callback() {
    @Override
    public void onFailure(Call call, IOException e) {
    }
    @Override
    public void onResponse(Call call, Response response) throws IOException {
        // (5)取出相應的數據
        String data = response.body().string();
    }
});
複製代碼

能夠看到不論是同步請求仍是異步請求,OkHttp 的基本使用就只有 5 步。同步請求與異步請求惟一不一樣的就是第 (4) 步,前者使用同步方法 execute(),後者使用異步方法 enqueue()。接下來咱們就根據這 5 步進行源碼閱讀。算法

更多 OkHttp 的使用方法能夠看我以前寫的文章 Android 主流開源框架(二)OkHttp 使用詳解設計模式

2、OkHttp 源碼分析

源碼版本:3.11.0數組

2.1 (1)建立 OkHttpClient 對象

OkHttpClient client = new OkHttpClient();
複製代碼

首先咱們點擊建立的 OkHttpClient 對象進去源碼是這樣的:緩存

/*OkHttpClient*/
  public OkHttpClient() {
    this(new Builder());
}
複製代碼

而後是走了有參構造:服務器

/*OkHttpClient*/
  OkHttpClient(Builder builder) {
    this.dispatcher = builder.dispatcher;
    this.proxy = builder.proxy;
    this.protocols = builder.protocols;
    this.connectionSpecs = builder.connectionSpecs;
    this.interceptors = Util.immutableList(builder.interceptors);
    this.networkInterceptors = Util.immutableList(builder.networkInterceptors);
    this.eventListenerFactory = builder.eventListenerFactory;
    this.proxySelector = builder.proxySelector;
    this.cookieJar = builder.cookieJar;
    this.cache = builder.cache;
    this.internalCache = builder.internalCache;
    this.socketFactory = builder.socketFactory;

    boolean isTLS = false;
    for (ConnectionSpec spec : connectionSpecs) {
      isTLS = isTLS || spec.isTls();
    }

    if (builder.sslSocketFactory != null || !isTLS) {
      this.sslSocketFactory = builder.sslSocketFactory;
      this.certificateChainCleaner = builder.certificateChainCleaner;
    } else {
      X509TrustManager trustManager = Util.platformTrustManager();
      this.sslSocketFactory = newSslSocketFactory(trustManager);
      this.certificateChainCleaner = CertificateChainCleaner.get(trustManager);
    }

    if (sslSocketFactory != null) {
      Platform.get().configureSslSocketFactory(sslSocketFactory);
    }

    this.hostnameVerifier = builder.hostnameVerifier;
    this.certificatePinner = builder.certificatePinner.withCertificateChainCleaner(
        certificateChainCleaner);
    this.proxyAuthenticator = builder.proxyAuthenticator;
    this.authenticator = builder.authenticator;
    this.connectionPool = builder.connectionPool;
    this.dns = builder.dns;
    this.followSslRedirects = builder.followSslRedirects;
    this.followRedirects = builder.followRedirects;
    this.retryOnConnectionFailure = builder.retryOnConnectionFailure;
    this.connectTimeout = builder.connectTimeout;
    this.readTimeout = builder.readTimeout;
    this.writeTimeout = builder.writeTimeout;
    this.pingInterval = builder.pingInterval;

    if (interceptors.contains(null)) {
      throw new IllegalStateException("Null interceptor: " + interceptors);
    }
    if (networkInterceptors.contains(null)) {
      throw new IllegalStateException("Null network interceptor: " + networkInterceptors);
    }
  }
複製代碼

能夠看到有不少常量,這裏使用了建造者模式,因此這些常量能夠經過 build() 進行配置。若是不進行配置則使用無參構造中傳進來的默認配置,每一個常量的意思具體以下:

/*OkHttpClient*/
    public Builder() {
      dispatcher = new Dispatcher();// 分發器
      protocols = DEFAULT_PROTOCOLS;// HTTP 協議
      connectionSpecs = DEFAULT_CONNECTION_SPECS;// 傳輸層版本和鏈接協議
      eventListenerFactory = EventListener.factory(EventListener.NONE);// 事件監聽工廠
      proxySelector = ProxySelector.getDefault();// 代理選擇器
      cookieJar = CookieJar.NO_COOKIES;// cookie
      socketFactory = SocketFactory.getDefault();// socket 工廠
      hostnameVerifier = OkHostnameVerifier.INSTANCE;// 主機名字確認
      certificatePinner = CertificatePinner.DEFAULT;// 證書鏈
      proxyAuthenticator = Authenticator.NONE;// 代理服務器身份驗證
      authenticator = Authenticator.NONE;// 源服務器身份驗證
      connectionPool = new ConnectionPool();// 鏈接池
      dns = Dns.SYSTEM;// 域名
      followSslRedirects = true;// 是否遵循 ssl 重定向
      followRedirects = true;// 是否遵循重定向
      retryOnConnectionFailure = true;// 鏈接失敗的時候是否重試
      connectTimeout = 10_000;// 鏈接超時
      readTimeout = 10_000;// 讀超時
      writeTimeout = 10_000;// 寫超時
      pingInterval = 0;// HTTP / 2 和 Web 套接字 ping 之間的時間間隔
    }
複製代碼

2.2 (2)建立 Request 對象

Request request = new Request.Builder()
        .url(url)
        .build();
複製代碼

能夠看到,這裏一樣使用了建造者模式,咱們點擊 Request 進去看看:

/*Request*/
  //...
  final HttpUrl url;
  final String method;
  final Headers headers;
  final @Nullable RequestBody body;
  final Map<Class<?>, Object> tags;
  //...
複製代碼

發現 Request 仍是比較簡單的,只是用來設置一些請求連接(url)、請求方法(method)、請求頭(headers)、請求體(body)、標籤(tag,可做爲取消請求的標記)。

2.3 (3)建立 Call 對象

Call call = client.newCall(request);
複製代碼

咱們點擊 newCall() 方法進去看看:

/*OkHttpClient*/
  @Override public Call newCall(Request request) {
    return RealCall.newRealCall(this, request, false /* for web socket */);
  }
複製代碼

發現是調用了 RealCall 的 newRealCall() 方法,並傳入了 OkHttpClient 與 Request 對象。

再跟進到 newRealCall() 方法:

/*RealCall*/
  static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
    // Safely publish the Call instance to the EventListener.
    RealCall call = new RealCall(client, originalRequest, forWebSocket);
    call.eventListener = client.eventListenerFactory().create(call);
    return call;
  }
複製代碼

發現是建立了一個 RealCall 對象,並返回給上一層。RealCall 是 Call 的實現類,Call 定義了請求相關的操做,例如同步異步、取消請求等方法。因此後續的請求相關操做基本都是在調用 Call 定義的方法,而這些方法真正的執行是它的實現類 RealCall。

最後看看 RealCall 的構造函數,該函數是比較簡單的,只是賦值一些常量,而後建立了重試與重定向攔截器(RetryAndFollowUpInterceptor)(這個後面會講):

/*RealCall*/
  private RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
    this.client = client;
    this.originalRequest = originalRequest;
    this.forWebSocket = forWebSocket;
    this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);
  }
複製代碼

2.4 (4)發送請求並獲取服務器返回的數據

前面咱們已經說了,同步請求與異步請求惟一不一樣的就是第 (4) 步,前者使用同步方法 execute(),後者使用異步方法 enqueue()。因此咱們分 2 種狀況來說。

2.4.1 同步請求

Response response = call.execute();
複製代碼

咱們點擊 execute() 方法進去看看:

/*RealCall*/
  @Override public Response execute() throws IOException {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    eventListener.callStart(this);
    try {
      client.dispatcher().executed(this);// (1)
      Response result = getResponseWithInterceptorChain();// (2)
      if (result == null) throw new IOException("Canceled");
      return result;
    } catch (IOException e) {
      eventListener.callFailed(this, e);
      throw e;
    } finally {
      client.dispatcher().finished(this);// (3)
    }
  }
複製代碼

源碼中我標註了 3 個關注點,點擊關注點(1)的 executed() 方法進去,能夠看到是將傳進來的 RealCall 加入了一個雙端隊列:

/*Dispatcher*/
  synchronized void executed(RealCall call) {
    runningSyncCalls.add(call);
  }
複製代碼

其中 runningSyncCalls 是一個雙端隊列,用來記錄正在運行的同步請求隊列:

/*Dispatcher*/
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
複製代碼

關注點(2)返回了一個 Response,也就是服務器返回的數據,說明請求就是在這裏執行了,這個是咱們要研究的重點,放到後面再說。

點擊關注點(3)的 finished() 方法進去,是這樣的:

/*Dispatcher*/
  void finished(RealCall call) {
    finished(runningSyncCalls, call, false);
  }

  private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    int runningCallsCount;
    Runnable idleCallback;
    synchronized (this) {
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");// (1)
      if (promoteCalls) promoteCalls();
      runningCallsCount = runningCallsCount();
      idleCallback = this.idleCallback;
    }

    if (runningCallsCount == 0 && idleCallback != null) {
      idleCallback.run();
    }
  }
複製代碼

能夠看到關注點(1)calls.remove(call) 只是把當前 RealCall 又從正在運行的同步請求隊列中移除了,說明請求已經完成了。

你應該注意到了,上面還有個 dispatcher 沒講到,其實這是一個分發器,是用來對請求進行分發的。咱們剛剛也分析了在同步請求中涉及到的 dispatcher 只是用來記錄正在運行的同步請求隊列,而後請求完成就移除掉。因此這個分發器主要用在異步請求中,咱們放到異步請求中再去講。

2.4.2 異步請求

call.enqueue(new Callback() {
            @Override
            public void onFailure(Call call, IOException e) {
            }
            @Override
            public void onResponse(Call call, Response response) throws IOException {
            }
        });
複製代碼

咱們點擊 enqueue() 方法進去看看:

/*RealCall*/
  @Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    eventListener.callStart(this);
    client.dispatcher().enqueue(new AsyncCall(responseCallback));// (1)
  }
複製代碼

前面幾行與同步請求源碼同樣,咱們點擊關注點(1)的 enqueue() 方法進去看看:

/*Dispatcher*/
  synchronized void enqueue(AsyncCall call) {
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else {
      readyAsyncCalls.add(call);
    }
  }
複製代碼

能夠看到這裏面涉及到不少 Dispatcher 對象裏面的常量與變量,因此也能看出 Dispatcher 主要用在異步請求中。先看下 Dispatcher 對象裏面的常量與變量:

/*Dispatcher*/
  // 最大併發請求數
  private int maxRequests = 64;
  // 每一個主機最大請求數
  private int maxRequestsPerHost = 5;
  // 每次調度程序變爲空閒時調用的回調
  private @Nullable Runnable idleCallback;
  // 用來執行異步任務的線程池
  private @Nullable ExecutorService executorService;
  // 準備中的異步請求隊列
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
  // 正在運行的異步請求隊列
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
  // 正在運行的同步請求隊列
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
複製代碼

弄懂了這些常量與變量的意思,就很好理解上面關注點(1)的 enqueue() 方法了,即若是 」正在運行的異步請求隊列數「 小於 」最大併發請求數「,而且 」每一個主機正在運行的請求數「 小於 」每一個主機最大請求數「,則將當前請求繼續加入 」正在運行的異步請求隊列「 並在線程池中執行,不然將當前請求加入 」準備中的異步請求隊列「。

咱們看到線程池中還傳了一個 AsyncCall 進去,點擊進去看看:

/*RealCall*/
  final class AsyncCall extends NamedRunnable {
    private final Callback responseCallback;

    AsyncCall(Callback responseCallback) {
      super("OkHttp %s", redactedUrl());
      this.responseCallback = responseCallback;
    }

    String host() {
      return originalRequest.url().host();
    }

    Request request() {
      return originalRequest;
    }

    RealCall get() {
      return RealCall.this;
    }

    @Override protected void execute() {
      boolean signalledCallback = false;
      try {
        Response response = getResponseWithInterceptorChain();// (1)
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));// (2)
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);// (3)
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          eventListener.callFailed(RealCall.this, e);
          responseCallback.onFailure(RealCall.this, e);// (4)
        }
      } finally {
        client.dispatcher().finished(this);// (5)
      }
    }
  }
複製代碼

發現他是 RealCall 的內部類,繼承 NamedRunnable,實現了 Runnable。裏面一樣執行了 execute() 方法,仔細看這個方法與以前咱們閱讀同步請求中的 execute() 相似,關注點(1)(5)都是同樣的,不一樣的是多了 2 個回調,由於是異步請求,因此須要把最終返回的結果經過 responseCallback 回調到最外層咱們使用的地方去,其中(2)(4)是失敗的回調,(3)是成功的回調。

到這裏,OkHttp 基本使用的第(4)步除了 getResponseWithInterceptorChain() 方法,其餘都看完了,下面就重點閱讀這個方法。

2.4.3 攔截器

點擊 getResponseWithInterceptorChain() 方法進去看看:

/*RealCall*/
  Response getResponseWithInterceptorChain() throws IOException {
    // 建立一個攔截器集合
    List<Interceptor> interceptors = new ArrayList<>();
    // 添加用戶自定義的攔截器
    interceptors.addAll(client.interceptors());
    // 添加劇試與重定向攔截器
    interceptors.add(retryAndFollowUpInterceptor);
    // 添加橋攔截器
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    // 添加緩存攔截器
    interceptors.add(new CacheInterceptor(client.internalCache()));
    // 添加鏈接攔截器
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
    // 添加用戶自定義的網絡攔截器
      interceptors.addAll(client.networkInterceptors());
    }
    // 添加服務器請求攔截器
    interceptors.add(new CallServerInterceptor(forWebSocket));
    // (1) 構建責任鏈
    Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
        originalRequest, this, eventListener, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());
    // (2) 處理責任鏈中的攔截器
    return chain.proceed(originalRequest);
  }
複製代碼

能夠看到,這裏用到了不少攔截器,將這些攔截器構建成一條責任鏈,而後再一個個處理。這裏用到了責任鏈模式,每一個攔截器負責相應的功能,上一個攔截器完成會傳給下一個攔截器,直到最後一個攔截器執行完再一層層向上返回 Response。

咱們先驗證下這個責任鏈的執行過程是否跟我說的同樣,而後再看看每一個攔截器的具體做用。這裏我標記了 2 個關注點: 關注點(1)是構建一條責任鏈,並把責任鏈須要用到的參數傳過去,其中參數 5 爲責任鏈的索引,這裏傳 「0」 表示當前正在處理第一個攔截器。

關注點(2)是處理責任鏈中的攔截器,點擊 proceed() 方法進去看看:

/*RealInterceptorChain*/
  @Override public Response proceed(Request request) throws IOException {
    return proceed(request, streamAllocation, httpCodec, connection);
  }

  public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec, RealConnection connection) throws IOException {
    if (index >= interceptors.size()) throw new AssertionError();

    calls++;

    // If we already have a stream, confirm that the incoming request will use it.
    if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) {
      throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
          + " must retain the same host and port");
    }

    // If we already have a stream, confirm that this is the only call to chain.proceed().
    if (this.httpCodec != null && calls > 1) {
      throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
          + " must call proceed() exactly once");
    }

    //(1)start
    RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
        connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
        writeTimeout);
    Interceptor interceptor = interceptors.get(index);
    Response response = interceptor.intercept(next);
    // (1)end

    // Confirm that the next interceptor made its required call to chain.proceed().
    if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
      throw new IllegalStateException("network interceptor " + interceptor
          + " must call proceed() exactly once");
    }

    // Confirm that the intercepted response isn't null.
    if (response == null) {
      throw new NullPointerException("interceptor " + interceptor + " returned null");
    }

    if (response.body() == null) {
      throw new IllegalStateException(
          "interceptor " + interceptor + " returned a response with no body");
    }

    return response;
  }
複製代碼

能夠看到,除了一些判斷只須要看關注點(1)便可。這裏會構建一個新的責任鏈,而後把責任鏈的索引加 1(爲了下次從攔截器集合中取出下一個攔截器),接着從攔截器集合中取出當前攔截器並調用 intercept() 方法,這樣若是這個攔截器能夠完成任務會立刻返回 Response,不然會在 intercept() 方法中繼續處理責任鏈,由於該 intercept() 方法中會繼續調用責任鏈的 proceed() 方法。看完源碼確實跟咱們以前設想的同樣的,接下來咱們看看每一個攔截器的具體做用。

2.4.3.1 重試與重定向攔截器(RetryAndFollowUpInterceptor)

該攔截器主要負責失敗後重連以及重定向,從前面的 proceed() 方法可知,每一個攔截器被調用的方法都是 intercept() 方法,因此閱讀攔截器的入口就是該方法。

重試與重定向攔截器中的 intercept() 方法以下:

/*RetryAndFollowUpInterceptor*/
  @Override public Response intercept(Chain chain) throws IOException {
    Request request = chain.request();
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Call call = realChain.call();
    EventListener eventListener = realChain.eventListener();

    // (1) 建立 StreamAllocation 對象,用來協調三個實體(Connections、Streams、Calls)之間的關係
    StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
        createAddress(request.url()), call, eventListener, callStackTrace);
    this.streamAllocation = streamAllocation;

    // 重定向次數
    int followUpCount = 0;
    Response priorResponse = null;
    while (true) {
      if (canceled) {
        streamAllocation.release();
        throw new IOException("Canceled");
      }

      Response response;
      boolean releaseConnection = true;
      try {
       //(2)執行下一個攔截器
        response = realChain.proceed(request, streamAllocation, null, null);
        releaseConnection = false;
      } catch (RouteException e) {
        //(3)發生 Route 異常,則嘗試進行恢復
        if (!recover(e.getLastConnectException(), streamAllocation, false, request)) {
          throw e.getFirstConnectException();
        }
        releaseConnection = false;
        continue;
      } catch (IOException e) {
        //(4)發生 IO 異常,則嘗試進行恢復
        boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
        if (!recover(e, streamAllocation, requestSendStarted, request)) throw e;
        releaseConnection = false;
        continue;
      } finally {
        // 若是中途出現異常,則釋放全部資源
        if (releaseConnection) {
          streamAllocation.streamFailed(null);
          streamAllocation.release();
        }
      }

      // 構建 body 爲空的響應體
      if (priorResponse != null) {
        response = response.newBuilder()
            .priorResponse(priorResponse.newBuilder()
                    .body(null)
                    .build())
            .build();
      }

      Request followUp;
      try {
     // (5)檢查是否須要重定向,不須要則 followUp 返回 null
        followUp = followUpRequest(response, streamAllocation.route());
      } catch (IOException e) {
        streamAllocation.release();
        throw e;
      }

      // (6)不須要重定向,則返回以前的 response
      if (followUp == null) {
        if (!forWebSocket) {
          streamAllocation.release();
        }
        return response;
      }
      // 關閉資源
      closeQuietly(response.body());

      // 重定向次數大於最大值,則釋放 StreamAllocation 並拋出異常
      if (++followUpCount > MAX_FOLLOW_UPS) {
        streamAllocation.release();
        throw new ProtocolException("Too many follow-up requests: " + followUpCount);
      }

      if (followUp.body() instanceof UnrepeatableRequestBody) {
        streamAllocation.release();
        throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());
      }
	
      // 若是該請求沒法複用以前的鏈接,則釋放後從新建立
      if (!sameConnection(response, followUp.url())) {
        streamAllocation.release();
        streamAllocation = new StreamAllocation(client.connectionPool(),
            createAddress(followUp.url()), call, eventListener, callStackTrace);
        this.streamAllocation = streamAllocation;
      } else if (streamAllocation.codec() != null) {
        throw new IllegalStateException("Closing the body of " + response
            + " didn't close its backing stream. Bad interceptor?");
      }

      request = followUp;
      priorResponse = response;
    }
  }
複製代碼

該方法的註釋都寫的比較詳細了,咱們重點看下我標記的關注點。

  • (1):建立 StreamAllocation 對象,StreamAllocation 至關於一個管理類,用來協調三個實體(Connections、Streams、Calls)之間的關係。這裏還傳了一個 client.connectionPool(),它是第一步建立 OkHttpClient 對象的時候建立的,是一個鏈接池。它們會在後面的鏈接攔截器(ConnectInterceptor)中才被真正的使用到,後面會講。 其中:
    Connections:鏈接到遠程服務器的物理套接字。
    Streams:在鏈接上分層的邏輯 http 請求/響應對。
    Calls:流的邏輯序列,一般是初始請求以及它的重定向請求。

  • (2):是執行下一個攔截器,按順序調用那就是 BridgeInterceptor。

  • (3)(4):發生 Route 或 IO 異常,則進行重試,咱們看看重試的相關方法:

/*RetryAndFollowUpInterceptor*/
  private boolean recover(IOException e, StreamAllocation streamAllocation, boolean requestSendStarted, Request userRequest) {
    streamAllocation.streamFailed(e);

    // 客戶端配置了出錯再也不重試
    if (!client.retryOnConnectionFailure()) return false;

    // 沒法再次發送 request body
    if (requestSendStarted && userRequest.body() instanceof UnrepeatableRequestBody) return false;

    // 發生 isRecoverable() 方法中出現的異常
    if (!isRecoverable(e, requestSendStarted)) return false;

    // 沒有更多的路線可供嘗試
    if (!streamAllocation.hasMoreRoutes()) return false;

    // For failure recovery, use the same route selector with a new connection.
    return true;
  }

  private boolean isRecoverable(IOException e, boolean requestSendStarted) {
    // 協議異常
    if (e instanceof ProtocolException) {
      return false;
    }

    // 中斷異常
    if (e instanceof InterruptedIOException) {
      return e instanceof SocketTimeoutException && !requestSendStarted;
    }

    // SSL握手異常
    if (e instanceof SSLHandshakeException) {
      // If the problem was a CertificateException from the X509TrustManager,
      // do not retry.
      if (e.getCause() instanceof CertificateException) {
        return false;
      }
    }
 // SSL握手未受權異常
    if (e instanceof SSLPeerUnverifiedException) {
      // e.g. a certificate pinning error.
      return false;
    }

    // An example of one we might want to retry with a different route is a problem connecting to a
    // proxy and would manifest as a standard IOException. Unless it is one we know we should not
    // retry, we return true and try a new route.
    return true;
  }
複製代碼

能夠看到嘗試進行重試的時候,若是出現如下狀況則不會重試:

  • 客戶端配置了出錯再也不重試

  • 沒法再次發送 request body

  • 發生 ProtocolException(協議異常)、InterruptedIOException(中斷異常)、SSLHandshakeException(SSL握手異常)、SSLPeerUnverifiedException(SSL握手未受權異常)中的任意一個異常

  • 沒有更多的路線可供嘗試

  • (5)(6):檢查是否須要重定向,若是不須要則返回以前的 response,須要則進行重定向,也就是繼續循環請求重試。是否須要重定向主要根據響應碼來決定,具體能夠去看看 followUpRequest() 方法,這裏就不貼代碼了。

ps:若是你想拿重定向的域名來跟一遍源碼中重定向的流程,那麼你能夠試試郭霖的域名(http://guolin.tech), 該域名會重定向到他的 csdn 博客(https://blog.csdn.net/guolin_blog), 走一遍流程會讓你對源碼中重定向的原理有更深的理解。

2.4.3.2 橋攔截器(BridgeInterceptor)

該攔截器至關於一個橋樑,首先將用戶的請求轉換爲發給服務器的請求,而後使用該請求訪問網絡,最後將服務器返回的響應轉換爲用戶可用的響應。

咱們看看該攔截器中的 intercept() 方法:

/*BridgeInterceptor*/
  @Override public Response intercept(Chain chain) throws IOException {
    Request userRequest = chain.request();
    Request.Builder requestBuilder = userRequest.newBuilder();

    //(1)將用戶的請求轉換爲發給服務器的請求-start
    RequestBody body = userRequest.body();
    if (body != null) {
      MediaType contentType = body.contentType();
      if (contentType != null) {
        requestBuilder.header("Content-Type", contentType.toString());
      }

      long contentLength = body.contentLength();
      if (contentLength != -1) {
        requestBuilder.header("Content-Length", Long.toString(contentLength));
        requestBuilder.removeHeader("Transfer-Encoding");
      } else {
        requestBuilder.header("Transfer-Encoding", "chunked");
        requestBuilder.removeHeader("Content-Length");
      }
    }

    if (userRequest.header("Host") == null) {
      requestBuilder.header("Host", hostHeader(userRequest.url(), false));
    }

    if (userRequest.header("Connection") == null) {
      requestBuilder.header("Connection", "Keep-Alive");
    }

   // 若是咱們在建立 Request 的時候添加了 "Accept-Encoding: gzip" 請求頭,那麼要本身負責解壓縮傳輸流。
    boolean transparentGzip = false;
    if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
     // 默認是 gzip 壓縮
      transparentGzip = true;
      requestBuilder.header("Accept-Encoding", "gzip");
    }

    List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
    if (!cookies.isEmpty()) {
      requestBuilder.header("Cookie", cookieHeader(cookies));
    }

    if (userRequest.header("User-Agent") == null) {
      requestBuilder.header("User-Agent", Version.userAgent());
    }
    //(1)將用戶的請求轉換爲發給服務器的請求-end

    //(2)執行下一個攔截器進行網絡請求
    Response networkResponse = chain.proceed(requestBuilder.build());

    //(3)將服務器返回的響應轉換爲用戶可用的響應-start
    // 解析服務器返回的 header
    HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());

    Response.Builder responseBuilder = networkResponse.newBuilder()
        .request(userRequest);

    // gzip 解壓
    if (transparentGzip
        && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
        && HttpHeaders.hasBody(networkResponse)) {
      GzipSource responseBody = new GzipSource(networkResponse.body().source());
      Headers strippedHeaders = networkResponse.headers().newBuilder()
          .removeAll("Content-Encoding")
          .removeAll("Content-Length")
          .build();
      responseBuilder.headers(strippedHeaders);
      String contentType = networkResponse.header("Content-Type");
      responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
    }
    //(3)將服務器返回的響應轉換爲用戶可用的響應-end

    return responseBuilder.build();
  }
複製代碼

根據我標記的關注點大概就是:

  • (1):將用戶的請求轉換爲發給服務器的請求。主要是添加一些默認的請求頭,例如 Content-Type、Content-Length、Transfer-Encoding、Host、Connection。由於咱們在建立 Request 的時候能夠不添加任何請求頭,若是這裏不加上一些默認的請求頭是沒法完成請求的。
  • (2):執行下一個攔截器進行網絡請求。
  • (3):將服務器返回的響應轉換爲用戶可用的響應。主要是解析服務器返回的 header,進行 gzip 解壓。

2.4.3.3 緩存攔截器(CacheInterceptor)

該攔截器主要用來實現緩存的讀取和存儲,即進行網絡請求的時候執行到緩存攔截器會先判斷是否有緩存,若是有會直接返回緩存,沒有則會執行後面的攔截器繼續請求網絡,請求成功會將請求到的數據緩存起來。

咱們看看該攔截器中的 intercept() 方法:

/*CacheInterceptor*/
  @Override public Response intercept(Chain chain) throws IOException {
    //(1)經過 Request 獲得緩存
    Response cacheCandidate = cache != null
        ? cache.get(chain.request())
        : null;

    long now = System.currentTimeMillis();

    //(2)經過緩存策略獲取是使用緩存仍是使用網絡請求,或者 2 者同時使用或都不使用
    CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
    Request networkRequest = strategy.networkRequest;
    Response cacheResponse = strategy.cacheResponse;

    if (cache != null) {
      cache.trackResponse(strategy);
    }

    // 有緩存,可是策略中不使用緩存,須要釋放資源
    if (cacheCandidate != null && cacheResponse == null) {
      closeQuietly(cacheCandidate.body());
    }

    // (3)若是策略中不使用網絡請求,也不使用緩存,那麼直接返回失敗
    if (networkRequest == null && cacheResponse == null) {
      return new Response.Builder()
          .request(chain.request())
          .protocol(Protocol.HTTP_1_1)
          .code(504)
          .message("Unsatisfiable Request (only-if-cached)")
          .body(Util.EMPTY_RESPONSE)
          .sentRequestAtMillis(-1L)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build();
    }

    //(4)若是策略中不使用網絡請求,執行到這裏說明是使用緩存的,則直接返回緩存
    if (networkRequest == null) {
      return cacheResponse.newBuilder()
          .cacheResponse(stripBody(cacheResponse))
          .build();
    }

    Response networkResponse = null;
    try {
      //(5)執行下一個攔截器進行網絡請求
      networkResponse = chain.proceed(networkRequest);
    } finally {
      // 若是發生 IO 或者其餘崩潰,爲了避免泄漏緩存體,須要釋放資源
      if (networkResponse == null && cacheCandidate != null) {
        closeQuietly(cacheCandidate.body());
      }
    }

    //(6)若是策略中使用緩存,而且響應碼爲 304,則返回緩存
    if (cacheResponse != null) {
      if (networkResponse.code() == HTTP_NOT_MODIFIED) {
        Response response = cacheResponse.newBuilder()
            .headers(combine(cacheResponse.headers(), networkResponse.headers()))
            .sentRequestAtMillis(networkResponse.sentRequestAtMillis())
            .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
            .cacheResponse(stripBody(cacheResponse))
            .networkResponse(stripBody(networkResponse))
            .build();
        networkResponse.body().close();

        cache.trackConditionalCacheHit();
        // 更新緩存
        cache.update(cacheResponse, response);
        return response;
      } else {
        closeQuietly(cacheResponse.body());
      }
    }

    Response response = networkResponse.newBuilder()
        .cacheResponse(stripBody(cacheResponse))
        .networkResponse(stripBody(networkResponse))
        .build();

    if (cache != null) {
      if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
        //(7)將請求返回的結果存進緩存
        CacheRequest cacheRequest = cache.put(response);
        return cacheWritingResponse(cacheRequest, response);
      }

      if (HttpMethod.invalidatesCache(networkRequest.method())) {
        try {
          cache.remove(networkRequest);
        } catch (IOException ignored) {
          // The cache cannot be written.
        }
      }
    }

    return response;
  }
複製代碼

根據我標記的關注點大概流程就是:

  • (1):經過 Request 獲得緩存。這裏的 cache 是 InternalCache,可是由於 InternalCache 是一個接口,並且只有一個實現類 Cache,因此 cache 其實就是 Cache。進入 Cache 能夠發現它底層使用的是 DiskLruCache 緩存機制,也就是使用 「最近最少使用」 算法將數據緩存到磁盤內。
  • (2):經過緩存策略獲取是使用緩存仍是使用網絡請求,或者 2 者同時使用或都不使用。networkRequest 爲 null 表示不使用網絡請求,cacheResponse 爲 null 表示不使用緩存。
  • (3):若是策略中不使用網絡請求,也不使用緩存,那麼直接返回失敗。這樣就直接中止了後面攔截器的執行,結束了整個請求。
  • (4):若是策略中不使用網絡請求,執行到這裏說明是使用緩存的,則直接返回緩存。這樣就直接中止了後面攔截器的執行,結束了整個請求。
  • (5):執行到這裏,說明須要從網絡獲取數據,則會繼續執行下一個攔截器進行網絡請求。
  • (6):若是策略中使用緩存,而且響應碼爲 304,則返回緩存,而且更新緩存。
  • (7):最後將請求返回的結果進行緩存。

2.4.3.4 鏈接攔截器(ConnectInterceptor)

該攔截器主要用來打開與目標服務器的鏈接,而後繼續執行下一個攔截器。

咱們看看該攔截器中的 intercept() 方法:

/*ConnectInterceptor*/
  @Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
 	//(1)獲取 StreamAllocation
    StreamAllocation streamAllocation = realChain.streamAllocation();

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
 	//(2)建立 HttpCodec
    HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
 	//(3)獲取 RealConnection
    RealConnection connection = streamAllocation.connection();
 	//(4)執行下一個攔截器
    return realChain.proceed(request, streamAllocation, httpCodec, connection);
  }
複製代碼

根據我標記的關注點大概流程就是:

  • (1):獲取 StreamAllocation,這裏獲取的其實就是第一個攔截器 RetryAndFollowUpInterceptor 中建立的。
  • (2):建立 HttpCodec,是經過 StreamAllocation 的 newStream() 方法獲取的,咱們看下 newStream() 方法:
/*StreamAllocation*/
    public HttpCodec newStream( OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
    int connectTimeout = chain.connectTimeoutMillis();
    int readTimeout = chain.readTimeoutMillis();
    int writeTimeout = chain.writeTimeoutMillis();
    int pingIntervalMillis = client.pingIntervalMillis();
    boolean connectionRetryEnabled = client.retryOnConnectionFailure();

    try {
   	  //(5)尋找可用的鏈接
      RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
          writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
      //(6)經過這個可用的鏈接建立 HttpCodec
      HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);

      synchronized (connectionPool) {
        codec = resultCodec;
        return resultCodec;
      }
    } catch (IOException e) {
      throw new RouteException(e);
    }
  }
複製代碼

咱們看下關注點(5)中的 findHealthyConnection() 方法:

/*StreamAllocation*/
  private RealConnection findHealthyConnection(int connectTimeout, int readTimeout, int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks) throws IOException {
    while (true) {
   	  //(7)尋找一個鏈接
      RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
          pingIntervalMillis, connectionRetryEnabled);

      // 若是這是一個全新的鏈接,則不須要後面的健康檢查,而是在這裏直接返回鏈接
      synchronized (connectionPool) {
        if (candidate.successCount == 0) {
          return candidate;
        }
      }

      // 若是不健康,則禁止建立新流,而且繼續循環查找可用的連接
      if (!candidate.isHealthy(doExtensiveHealthChecks)) {
        noNewStreams();
        continue;
      }

      return candidate;
    }
  }
複製代碼

能夠看到,findHealthyConnection() 方法中又經過 findConnection() 方法去尋找,看下這個方法:

/*StreamAllocation*/
  private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
    boolean foundPooledConnection = false;
    RealConnection result = null;
    Route selectedRoute = null;
    Connection releasedConnection;
    Socket toClose;
    synchronized (connectionPool) {
      if (released) throw new IllegalStateException("released");
      if (codec != null) throw new IllegalStateException("codec != null");
      if (canceled) throw new IOException("Canceled");

      //(8)start
      // 嘗試使用已分配的鏈接
      releasedConnection = this.connection;
      toClose = releaseIfNoNewStreams();
      if (this.connection != null) {
  		// 已經分配的鏈接,而且是可用的,則將該已分配的鏈接賦值爲可用的鏈接
        result = this.connection;
        releasedConnection = null;
      }
   	  //(8)end
      if (!reportedAcquired) {
        // 若是這個鏈接從未標記過已獲取,那麼請不要標記爲爲已發佈
        releasedConnection = null;
      }

      //(9)start 嘗試從鏈接池中獲取鏈接
      if (result == null) {
        Internal.instance.get(connectionPool, address, this, null);
        if (connection != null) {
          foundPooledConnection = true;
          result = connection;
        } else {
          selectedRoute = route;
        }
      }
    }
    closeQuietly(toClose);

    if (releasedConnection != null) {
      eventListener.connectionReleased(call, releasedConnection);
    }
    if (foundPooledConnection) {
      eventListener.connectionAcquired(call, result);
    }
    if (result != null) {
      // 若是找到一個可用的鏈接,那麼直接返回
      return result;
    }

    boolean newRouteSelection = false;
    if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
      newRouteSelection = true;
      routeSelection = routeSelector.next();
    }

    synchronized (connectionPool) {
      if (canceled) throw new IOException("Canceled");

      //(10)根據不一樣的路由再次從鏈接池中獲取可用的鏈接
      if (newRouteSelection) {
        List<Route> routes = routeSelection.getAll();
        for (int i = 0, size = routes.size(); i < size; i++) {
          Route route = routes.get(i);
          Internal.instance.get(connectionPool, address, this, route);
          if (connection != null) {
            foundPooledConnection = true;
            result = connection;
            this.route = route;
            break;
          }
        }
      }

      //(11)仍是沒有找到可用的鏈接,那麼從新建立一個新的鏈接
      if (!foundPooledConnection) {
        if (selectedRoute == null) {
          selectedRoute = routeSelection.next();
        }

        route = selectedRoute;
        refusedStreamCount = 0;
        result = new RealConnection(connectionPool, selectedRoute);
        acquire(result, false);
      }
    }

    // 若是在第二次找到了可用的鏈接,則直接返回
    if (foundPooledConnection) {
      eventListener.connectionAcquired(call, result);
      return result;
    }

    //(12)進行 TCP 和 TLS 握手
    result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
        connectionRetryEnabled, call, eventListener);
    routeDatabase().connected(result.route());

    Socket socket = null;
    synchronized (connectionPool) {
      reportedAcquired = true;

      //(13)將新建立的鏈接放進鏈接池中
      Internal.instance.put(connectionPool, result);

      // 若是同時建立了到同一地址的另外一個多路複用鏈接,則釋放這個鏈接並獲取那個多路複用鏈接。
      if (result.isMultiplexed()) {
        socket = Internal.instance.deduplicate(connectionPool, address, this);
        result = connection;
      }
    }
    closeQuietly(socket);

    eventListener.connectionAcquired(call, result);
    return result;
  }
複製代碼

經過上面的代碼分析,findConnection() 方法大概流程就是:

  • (8):判斷當前鏈接是否可用,可用則進行賦值,在後面直接返回
  • (9):若是當前鏈接不可用,那麼嘗試從鏈接池中獲取可用鏈接
  • (10):若是鏈接池中找不到可用的鏈接,那麼切換不一樣的路由再次從鏈接池中獲取可用的鏈接
  • (11):仍是沒有找到可用的鏈接,那麼只能從新建立一個新的鏈接
  • (12):進行 TCP 和 TLS 握手
  • (13):最後將新建立的鏈接放進鏈接池中

能夠看到,關注點(9)(13)分別是從鏈接池中取出鏈接和存入鏈接到鏈接池,分別調用的是 Internal.instance.get() 與 Internal.instance.put()。
咱們看下 get() 方法是怎樣的,點擊 get() 方法進去,發現 Internal 是一個抽象類,它有一個靜態的實例,在 OkHttpClient 的靜態代碼快中被初始化:

/*OkHttpClient*/
  static {
    Internal.instance = new Internal() {
    // 省略部分代碼...
    
      @Override public RealConnection get(ConnectionPool pool, Address address, StreamAllocation streamAllocation, Route route) {
        return pool.get(address, streamAllocation, route);
      }
      
   // 省略部分代碼...
  }
複製代碼

能夠看到 Internal 的 get() 方法中調用的是 ConnectionPool(鏈接池)的 get() 方法,因此能夠確定這個鏈接池就是用來操做這些鏈接的,內部具體怎麼操做咱們放到後面去講,這裏只須要知道它能夠用來存取鏈接就能夠了。

關注點(12)其實就是與服務器創建鏈接的核心代碼,咱們看下這個方法:

/*RealConnection*/
  public void connect(int connectTimeout, int readTimeout, int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled, Call call, EventListener eventListener) {
    if (protocol != null) throw new IllegalStateException("already connected");

    /*線路選擇*/
    RouteException routeException = null;
    List<ConnectionSpec> connectionSpecs = route.address().connectionSpecs();
    ConnectionSpecSelector connectionSpecSelector = new ConnectionSpecSelector(connectionSpecs);

    if (route.address().sslSocketFactory() == null) {
      if (!connectionSpecs.contains(ConnectionSpec.CLEARTEXT)) {
        throw new RouteException(new UnknownServiceException(
            "CLEARTEXT communication not enabled for client"));
      }
      String host = route.address().url().host();
      if (!Platform.get().isCleartextTrafficPermitted(host)) {
        throw new RouteException(new UnknownServiceException(
            "CLEARTEXT communication to " + host + " not permitted by network security policy"));
      }
    } else {
      if (route.address().protocols().contains(Protocol.H2_PRIOR_KNOWLEDGE)) {
        throw new RouteException(new UnknownServiceException(
            "H2_PRIOR_KNOWLEDGE cannot be used with HTTPS"));
      }
    }

    while (true) {
      try {
        //(14)若是須要隧道鏈接,則進行隧道鏈接
        if (route.requiresTunnel()) {
          connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener);
          if (rawSocket == null) {
            // We were unable to connect the tunnel but properly closed down our resources.
            break;
          }
        } else {
          //(15)不須要隧道鏈接,則直接進行 socket 鏈接
          connectSocket(connectTimeout, readTimeout, call, eventListener);
        }
        // 創建協議
        establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener);
        // 鏈接結束
        eventListener.connectEnd(call, route.socketAddress(), route.proxy(), protocol);
        break;
      } catch (IOException e) {
        closeQuietly(socket);
        closeQuietly(rawSocket);
        socket = null;
        rawSocket = null;
        source = null;
        sink = null;
        handshake = null;
        protocol = null;
        http2Connection = null;
  
        // 鏈接失敗
        eventListener.connectFailed(call, route.socketAddress(), route.proxy(), null, e);

        if (routeException == null) {
          routeException = new RouteException(e);
        } else {
          routeException.addConnectException(e);
        }

        if (!connectionRetryEnabled || !connectionSpecSelector.connectionFailed(e)) {
          throw routeException;
        }
      }
    }

    if (route.requiresTunnel() && rawSocket == null) {
      ProtocolException exception = new ProtocolException("Too many tunnel connections attempted: "
          + MAX_TUNNEL_ATTEMPTS);
      throw new RouteException(exception);
    }

    if (http2Connection != null) {
      synchronized (connectionPool) {
        allocationLimit = http2Connection.maxConcurrentStreams();
      }
    }
  }
複製代碼

關注點(14)(15)最終都會調用 connectSocket() 方法:

/*RealConnection*/
  private void connectSocket(int connectTimeout, int readTimeout, Call call, EventListener eventListener) throws IOException {
    Proxy proxy = route.proxy();
    Address address = route.address();

    // 建立 socket
    rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
        ? address.socketFactory().createSocket()
        : new Socket(proxy);

    eventListener.connectStart(call, route.socketAddress(), proxy);
    // 設置 socket 超時時間
    rawSocket.setSoTimeout(readTimeout);
    try {
      //(16)進行 socket 鏈接
      Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
    } catch (ConnectException e) {
      ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
      ce.initCause(e);
      throw ce;
    }

    try {
      source = Okio.buffer(Okio.source(rawSocket));
      sink = Okio.buffer(Okio.sink(rawSocket));
    } catch (NullPointerException npe) {
      if (NPE_THROW_WITH_NULL.equals(npe.getMessage())) {
        throw new IOException(npe);
      }
    }
  }
複製代碼

能夠看到 okhttp 底層是經過 socket 進行鏈接的。

看完關注點(5)中的 findHealthyConnection() 方法,咱們繼續回去看關注點(6)的方法:

/*StreamAllocation*/
   public HttpCodec newCodec(OkHttpClient client, Interceptor.Chain chain, StreamAllocation streamAllocation) throws SocketException {
    if (http2Connection != null) {
      return new Http2Codec(client, chain, streamAllocation, http2Connection);
    } else {
      socket.setSoTimeout(chain.readTimeoutMillis());
      source.timeout().timeout(chain.readTimeoutMillis(), MILLISECONDS);
      sink.timeout().timeout(chain.writeTimeoutMillis(), MILLISECONDS);
      return new Http1Codec(client, streamAllocation, source, sink);
    }
  }
複製代碼

該方法是建立 HttpCodec,HttpCodec 的做用主要是進行 HTTP 請求和響應的編碼與解碼操做。它有兩個實現類,分別是 Http1Codec 與 Http2Codec,這裏主要判斷若是是 HTTP/2,則建立 Http2Codec,不然建立 Http1Codec。

  • (3):繼續回去看關注點(3),點擊 connection() 方法進去發現,這裏獲取的 RealConnection 其實就是關注點(7) findConnection() 方法中從鏈接池中取出鏈接或從新建立的鏈接。

  • (4):關注點(4)則拿到鏈接後繼續執行下一個攔截器。

2.4.3.5 服務器請求攔截器(CallServerInterceptor)

該攔截器主要用來向服務器發起請求並獲取數據,它是責任鏈中的最後一個攔截器,獲取到服務器的數據後會直接返回給上一個攔截器。

咱們看看該攔截器中的 intercept() 方法:

/*CallServerInterceptor*/
  @Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    // 獲取 ConnectInterceptor 中建立的 HttpCodec
    HttpCodec httpCodec = realChain.httpStream();
    // 獲取 RetryAndFollowUpInterceptor 中建立的 StreamAllocation
    StreamAllocation streamAllocation = realChain.streamAllocation();
    // 獲取 ConnectInterceptor 中新建立或者從鏈接池中拿到的 RealConnection
    RealConnection connection = (RealConnection) realChain.connection();
    Request request = realChain.request();

    long sentRequestMillis = System.currentTimeMillis();

    realChain.eventListener().requestHeadersStart(realChain.call());
    //(1)寫入請求頭
    httpCodec.writeRequestHeaders(request);
    realChain.eventListener().requestHeadersEnd(realChain.call(), request);

    Response.Builder responseBuilder = null;
    if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
      // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
      // Continue" response before transmitting the request body. If we don't get that, return
      // what we did get (such as a 4xx response) without ever transmitting the request body.
      if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
        httpCodec.flushRequest();
        realChain.eventListener().responseHeadersStart(realChain.call());
        responseBuilder = httpCodec.readResponseHeaders(true);
      }

      //(2)寫入請求體
      if (responseBuilder == null) {
        // Write the request body if the "Expect: 100-continue" expectation was met.
        realChain.eventListener().requestBodyStart(realChain.call());
        long contentLength = request.body().contentLength();
        CountingSink requestBodyOut =
            new CountingSink(httpCodec.createRequestBody(request, contentLength));
        BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);

        request.body().writeTo(bufferedRequestBody);
        bufferedRequestBody.close();
        realChain.eventListener()
            .requestBodyEnd(realChain.call(), requestBodyOut.successfulCount);
      } else if (!connection.isMultiplexed()) {
        // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
        // from being reused. Otherwise we're still obligated to transmit the request body to
        // leave the connection in a consistent state.
        streamAllocation.noNewStreams();
      }
    }

    httpCodec.finishRequest();

    if (responseBuilder == null) {
      realChain.eventListener().responseHeadersStart(realChain.call());
      //(3)讀取響應頭
      responseBuilder = httpCodec.readResponseHeaders(false);
    }

    Response response = responseBuilder
        .request(request)
        .handshake(streamAllocation.connection().handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();

    int code = response.code();
    if (code == 100) {
      // server sent a 100-continue even though we did not request one.
      // try again to read the actual response
      responseBuilder = httpCodec.readResponseHeaders(false);

      response = responseBuilder
              .request(request)
              .handshake(streamAllocation.connection().handshake())
              .sentRequestAtMillis(sentRequestMillis)
              .receivedResponseAtMillis(System.currentTimeMillis())
              .build();

      code = response.code();
    }

    realChain.eventListener()
            .responseHeadersEnd(realChain.call(), response);

    //(4)讀取響應體
    if (forWebSocket && code == 101) {
      // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
      response = response.newBuilder()
          .body(Util.EMPTY_RESPONSE)
          .build();
    } else {
      response = response.newBuilder()
          .body(httpCodec.openResponseBody(response))
          .build();
    }

    if ("close".equalsIgnoreCase(response.request().header("Connection"))
        || "close".equalsIgnoreCase(response.header("Connection"))) {
      streamAllocation.noNewStreams();
    }

    if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
      throw new ProtocolException(
          "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
    }

    return response;
  }
複製代碼

能夠看到,這個攔截器仍是比較簡單的,上一個攔截器 ConnectInterceptor 已經鏈接到服務器了並建立了 HttpCodec 對象,HttpCodec 對象封裝了 okio 提供的輸出流(BufferedSink)與輸入流(BufferedSource),因此這裏就主要經過 HttpCodec 對象與服務器進行讀寫操做。例如寫入請求頭與請求體,讀取響應頭與響應體。

2.4.4 ConnectionPool(鏈接池)

  • 簡介 鏈接池是用來管理 HTTP 和 HTTP / 2 鏈接的複用,以減小網絡延遲。從上面咱們閱讀 findConnection() 方法源碼也能夠得出,即若是從鏈接池中找到了可用的鏈接,那麼就不用從新建立新的鏈接,也省去了 TCP 和 TLS 握手。

  • ConnectionPool 類中的主要常量

/*ConnectionPool*/
  // 線程池,用於清除過時的鏈接
  private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
      Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
      new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));

  // 最大容許空閒的鏈接數量
  private final int maxIdleConnections;
  // 鏈接的存活時間
  private final long keepAliveDurationNs;
  // 清理任務,用來清理無效的鏈接
  private final Runnable cleanupRunnable = new Runnable() {
  //...
  };
  // 用來記錄鏈接的雙端隊列
  private final Deque<RealConnection> connections = new ArrayDeque<>();
複製代碼
  • 構造函數
/*ConnectionPool*/
  public ConnectionPool() {
    this(5, 5, TimeUnit.MINUTES);
  }

  public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
    this.maxIdleConnections = maxIdleConnections;
    this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);

    // Put a floor on the keep alive duration, otherwise cleanup will spin loop.
    if (keepAliveDuration <= 0) {
      throw new IllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration);
    }
  }
複製代碼

能夠看到構造函數設置了默認的最大容許空閒的鏈接數量爲 5 個,鏈接的存活時間爲 5 分鐘。

  • 主要函數 這裏主要講下前面鏈接攔截器中用到的 get()、put() 方法。

get() 方法:

/*ConnectionPool*/
  @Nullable RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
    assert (Thread.holdsLock(this));
    for (RealConnection connection : connections) {
      if (connection.isEligible(address, route)) {
        streamAllocation.acquire(connection, true);
        return connection;
      }
    }
    return null;
  }
複製代碼

該方法是從鏈接池中獲取可複用的鏈接,這裏的邏輯是遍歷記錄鏈接的雙端隊列,取出可複用的鏈接。

put() 方法:

/*ConnectionPool*/
  void put(RealConnection connection) {
    assert (Thread.holdsLock(this));
    if (!cleanupRunning) {
      cleanupRunning = true;
      // 執行清理任務
      executor.execute(cleanupRunnable);
    }
    // 將新建立的鏈接添加進記錄鏈接的雙端隊列中
    connections.add(connection);
  }
複製代碼

該方法是將新建立的鏈接放進鏈接池中,這裏的邏輯是先清理無效的鏈接,而後再將新建立的鏈接添加進記錄鏈接的雙端隊列中。

咱們先看下清理任務:

/*ConnectionPool*/
  private final Runnable cleanupRunnable = new Runnable() {
    @Override public void run() {
      while (true) {
        // 清理無效鏈接
        long waitNanos = cleanup(System.nanoTime());
        if (waitNanos == -1) return;
        if (waitNanos > 0) {
          long waitMillis = waitNanos / 1000000L;
          waitNanos -= (waitMillis * 1000000L);
          synchronized (ConnectionPool.this) {
            try {
              ConnectionPool.this.wait(waitMillis, (int) waitNanos);
            } catch (InterruptedException ignored) {
            }
          }
        }
      }
    }
  };
複製代碼

這是一個阻塞的清理任務,而且經過無限循環來清理。這裏首先調用 cleanup() 方法清理無效鏈接,並返回下次須要清理的間隔時間,而後調用 wait() 方法進行等待以釋放鎖與時間片,當等待時間到了後,再次循環清理。

咱們看下 cleanup() 方法:

/*ConnectionPool*/
  long cleanup(long now) {
    int inUseConnectionCount = 0;
    int idleConnectionCount = 0;
    RealConnection longestIdleConnection = null;
    long longestIdleDurationNs = Long.MIN_VALUE;

    // 遍歷鏈接,找出無效鏈接進行清理
    synchronized (this) {
      for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
        RealConnection connection = i.next();

        //(1)查詢此鏈接的 StreamAllocation 的引用數量,大於 0 則 inUseConnectionCount 加 1,不然 idleConnectionCount 加 1。
        if (pruneAndGetAllocationCount(connection, now) > 0) {
          inUseConnectionCount++;
          continue;
        }

        idleConnectionCount++;

        // 標記空閒鏈接
        long idleDurationNs = now - connection.idleAtNanos;
        if (idleDurationNs > longestIdleDurationNs) {
          longestIdleDurationNs = idleDurationNs;
          longestIdleConnection = connection;
        }
      }

      if (longestIdleDurationNs >= this.keepAliveDurationNs
          || idleConnectionCount > this.maxIdleConnections) {
        // 若是鏈接存活時間大於等於 5 分鐘,或者空閒的鏈接數量大於 5 個,則將該連接從隊列中移除
        connections.remove(longestIdleConnection);
      } else if (idleConnectionCount > 0) {
        // 若是空閒的鏈接數量大於 0,返回此鏈接即將到期的時間
        return keepAliveDurationNs - longestIdleDurationNs;
      } else if (inUseConnectionCount > 0) {
        // 若是沒有空閒鏈接,則返回 5 分鐘,也就是下次須要清理的間隔時間爲 5 分鐘
        return keepAliveDurationNs;
      } else {
        // 沒有任何鏈接,則跳出循環
        cleanupRunning = false;
        return -1;
      }
    }

    closeQuietly(longestIdleConnection.socket());

    // 立刻進行下一次清理
    return 0;
  }
複製代碼

能夠看到,這裏主要經過判斷鏈接存活時間是否大於等於 5 分鐘,或者空閒的鏈接數量是否大於 5 個來進行鏈接的清理。鏈接是否空閒是經過關注點(1)中的 pruneAndGetAllocationCount() 方法來判斷的,咱們看下這個方法:

/*ConnectionPool*/
  private int pruneAndGetAllocationCount(RealConnection connection, long now) {
    // 得到 allocations 的弱引用列表
    List<Reference<StreamAllocation>> references = connection.allocations;
    // 遍歷 allocations 的弱引用列表
    for (int i = 0; i < references.size(); ) {
      Reference<StreamAllocation> reference = references.get(i);

      // 說明 StreamAllocation 被使用,則繼續下一次循環
      if (reference.get() != null) {
        i++;
        continue;
      }

      // We've discovered a leaked allocation. This is an application bug.
      StreamAllocation.StreamAllocationReference streamAllocRef =
          (StreamAllocation.StreamAllocationReference) reference;
      String message = "A connection to " + connection.route().address().url()
          + " was leaked. Did you forget to close a response body?";
      Platform.get().logCloseableLeak(message, streamAllocRef.callStackTrace);

      // 說明 StreamAllocation 沒有被使用,則從列表中移除
      references.remove(i);
      connection.noNewStreams = true;

      // 列表爲空,說明都被移除了,這個時候返回 allocationCount 爲 0,表示該鏈接是空閒的。
      if (references.isEmpty()) {
        connection.idleAtNanos = now - keepAliveDurationNs;
        return 0;
      }
    }

    // 列表不爲空,返回列表的大小,大於 0 表示該鏈接是在使用的。
    return references.size();
  }
複製代碼

該方法比較簡單,主要是遍歷 allocations 的弱引用列表,若是 StreamAllocation 沒有被使用,則從列表中移除,最後返回該列表的大小,經過該大小便可判斷是不是空閒鏈接,小於等於 0 纔是空閒鏈接。

2.5 (5)取出相應的數據

String data = response.body().string();
複製代碼

在第(4)步同步請求或者異步請求執行完都會返回 Response,這個就是最終返回的數據,能夠經過它獲取到 code、message、header、body 等。

這裏講下 body,點擊 body() 進去是這樣的:

/*Response*/
  public @Nullable ResponseBody body() {
    return body;
  }
複製代碼

能夠看到這裏的 body 就是 ResponseBody,它是一個抽象類,不能被實例化,通常用它的子類 RealResponseBody 進行實例化。它是在前面講的 「2.4.3.5 服務器請求攔截器(CallServerInterceptor)」 小節中賦值的:

/*CallServerInterceptor*/
    if (forWebSocket && code == 101) {
      response = response.newBuilder()
          .body(Util.EMPTY_RESPONSE)
          .build();
    } else {
	  // openResponseBody() 方法中建立了 RealResponseBody 對象返回
      response = response.newBuilder()
          .body(httpCodec.openResponseBody(response))
          .build();
    }
複製代碼

若是有緩存則會在緩存攔截器(CacheInterceptor)中賦值。

ResponseBody 中經常使用的方法有以下幾種:

/*ResponseBody*/
  public final String string() throws IOException {
    BufferedSource source = source();
    try {
      Charset charset = Util.bomAwareCharset(source, charset());
      return source.readString(charset);
    } finally {
      Util.closeQuietly(source);
    }
  }

  public final InputStream byteStream() {
    return source().inputStream();
  }

  public final byte[] bytes() throws IOException {
    long contentLength = contentLength();
    if (contentLength > Integer.MAX_VALUE) {
      throw new IOException("Cannot buffer entire body for content length: " + contentLength);
    }

    BufferedSource source = source();
    byte[] bytes;
    try {
      bytes = source.readByteArray();
    } finally {
      Util.closeQuietly(source);
    }
    if (contentLength != -1 && contentLength != bytes.length) {
      throw new IOException("Content-Length ("
          + contentLength
          + ") and stream length ("
          + bytes.length
          + ") disagree");
    }
    return bytes;
  }
複製代碼

能夠看到,這三個方法內部都調用了 source() 來獲取 BufferedSource,BufferedSource 就是 okio 提供的輸入流,拿到輸入流就能夠將 body 數據轉換爲你須要的類型。例如:
但願返回 String,則調用 response.body().string(),適用於不超過 1 MB 的數據。
但願返回輸入流,則調用 response.body().byteStream(),適用於超過 1 MB 的數據,例以下載文件。
但願返回二進制字節數組,則調用 response.body().bytes()。

須要注意的是,response.body().string() 只能調用一次,不然會拋出以下異常:

W/System.err: java.lang.IllegalStateException: closed
W/System.err:     at okio.RealBufferedSource.rangeEquals(RealBufferedSource.java:408)
W/System.err:     at okio.RealBufferedSource.rangeEquals(RealBufferedSource.java:402)
W/System.err:     at okhttp3.internal.Util.bomAwareCharset(Util.java:469)
W/System.err:     at okhttp3.ResponseBody.string(ResponseBody.java:175)
複製代碼

根據報錯日誌能夠看到,是在 RealBufferedSource 類的 408 行報的錯,咱們跳轉過去看看:

/*RealBufferedSource*/
  @Override
  public boolean rangeEquals(long offset, ByteString bytes, int bytesOffset, int byteCount) throws IOException {
    if (closed) throw new IllegalStateException("closed");
	//...
  }
複製代碼

能夠看到,這裏作了個判斷,closed 爲 true 就拋出該異常,繼續跟蹤 closed 賦值的地方:

/*RealBufferedSource*/
  @Override public void close() throws IOException {
    if (closed) return;
    closed = true;
    source.close();
    buffer.clear();
  }
複製代碼

能夠看到,closed 惟一賦值的地方在 close() 方法中,而該方法正是 string() 方法中的 Util.closeQuietly(source); 調用的:

/*ResponseBody*/
  public static void closeQuietly(Closeable closeable) {
    if (closeable != null) {
      try {
        closeable.close();
      } catch (RuntimeException rethrown) {
        throw rethrown;
      } catch (Exception ignored) {
      }
    }
  }
複製代碼

到這裏咱們就明白了爲何 response.body().string() 只能調用一次的緣由,即 string() 方法中獲取到 String後又調用了 Util.closeQuietly(source) 方法關閉了輸入流,而且標記 closed 爲 true,而後第二次調用 string() 方法的時候會在 RealBufferedSource.rangeEquals() 方法進行判斷,爲 true 就拋出異常。

這樣設計的緣由是服務器返回的 body 可能會很大,因此 OkHttp 不會將其存儲在內存中,只有當你須要的時候纔去獲取它,若是沒有新的請求則沒法獲取 2 次。

3、總結

看完源碼,發現 OkHttp 是一個設計得很是優秀的框架。該框架運用了不少設計模式,例如建造者模式、責任鏈模式等等。知道了 OkHttp 的核心是攔截器,這裏採用的就是責任鏈模式,每一個攔截器負責相應的功能,發起請求的時候由上往下依次執行每一個攔截器,響應的數據則層層往上傳遞。

參考資料:

相關文章
相關標籤/搜索