你想要的系列:網絡請求框架OkHttp3全解系列 - (四)攔截器詳解2:鏈接、請求服務(重點)

Okhttp系列文章:java

你想要的系列:網絡請求框架OkHttp3全解系列 - (一)OkHttp的基本使用json

你想要的系列:網絡請求框架OkHttp3全解系列 - (二)OkHttp的工做流程分析緩存

你想要的系列:網絡請求框架OkHttp3全解系列 - (三)攔截器詳解1:重試重定向、橋、緩存(重點)安全

你想要的系列:網絡請求框架OkHttp3全解系列 - (四)攔截器詳解2:鏈接、請求服務(重點)服務器


在本系列的上一篇文章你想要的系列:網絡請求框架OkHttp3全解系列 - (三)攔截器詳解1:重試重定向、橋、緩存(重點)中,咱們分析了OkHttp攔截器鏈中的前三個攔截器:RetryAndFollowUpInterceptor、BridgeInterceptor、CacheInterceptor,它們在請求創建鏈接以前作了一些預處理。markdown

若是請求通過這三個攔截器後,要繼續往下傳遞,說明是須要進行網絡請求的(緩存不能直接知足),也就是今天要分析的內容——剩下的兩個攔截器:ConnectInterceptor、CallServerInterceptor,分別負責 鏈接創建請求服務讀寫網絡

背景 - HTTP協議發展

在講解攔截器以前,咱們有必要了解http協議相關背景知識,由於okhttp的網絡鏈接正是基於此實現的。HTTP協議經歷瞭如下三個版本階段。架構

HTTP1.0

HTTP1.0中,一次請求 會創建一個TCP鏈接,請求完成後主動斷開鏈接。這種方法的好處是簡單,各個請求互不干擾。 但每次請求都會經歷 3次握手、2次或4次揮手的鏈接創建和斷開過程——極大影響網絡效率和系統開銷。 app

http1.0

HTTP1.1

HTTP1.1中,解決了HTTP1.0中鏈接不能複用的問題,支持持久鏈接——使用keep-alive機制:一次HTTP請求結束後不會當即斷開TCP鏈接,若是此時有新的HTTP請求,且其請求的Host同上次請求相同,那麼會直接複用TCP鏈接。這樣就減小了創建和關閉鏈接的消耗和延遲。keep-alive機制在HTTP1.1中是默認打開的——即在請求頭添加:connection:keep-alive。(keep-alive不會永久保持鏈接,它有一個保持時間,可在不一樣的服務器軟件(如Apache)中設定這個時間) 框架

http1.1

HTTP2.0

HTTP1.1中,鏈接的複用是串行的:一個請求創建了TCP鏈接,請求完成後,下一個相同host的請求繼續使用這個鏈接。 但客戶端想 同時 發起多個並行請求,那麼必須創建多個TCP鏈接。將會產生網絡延遲、增大網路開銷。

而且HTTP1.1不會壓縮請求和響應報頭,致使了沒必要要的網絡流量;HTTP1.1不支持資源優先級致使底層TCP鏈接利用率低下。在HTTP2.0中,這些問題都會獲得解決,HTTP2.0主要有如下特性

  • 新的二進制格式(Binary Format):http/1.x使用的是明文協議,其協議格式由三部分組成:request line,header,body,其協議解析是基於文本,可是這種方式存在自然缺陷,文本的表現形式有多樣性,要作到健壯性考慮的場景必然不少,二進制則不一樣,只認0和1的組合;基於這種考慮,http/2.0的協議解析決定採用二進制格式,實現方便且健壯
  • 多路複用(MultiPlexing):即鏈接共享,使用streamId用來區分請求,一個request對應一個stream並分配一個id,這樣一個TCP鏈接上能夠有多個stream,每一個stream的frame能夠隨機的混雜在一塊兒,接收方能夠根據stream id將frame再歸屬到各自不一樣的request裏面
  • 優先級和依賴(Priority、Dependency):每一個stream均可以設置優先級和依賴,優先級高的stream會被server優先處理和返回給客戶端,stream還能夠依賴其它的sub streams;優先級和依賴都是能夠動態調整的,好比在APP上瀏覽商品列表,用戶快速滑動到底部,可是前面的請求已經發出,若是不把後面的優先級設高,那麼當前瀏覽的圖片將會在最後展現出來,顯然不利於用戶體驗
  • header壓縮:http2.0使用encoder來減小須要傳輸的header大小,通信雙方各自cache一份header fields表,既避免了重複header的傳輸,又減少了須要傳輸的大小
  • 重置鏈接:不少APP裏都有中止下載圖片的需求,對於http1.x來講,是直接斷開鏈接,致使下次再發請求必須從新創建鏈接;http2.0引入RST_STREAM類型的frame,能夠在不斷開鏈接的前提下取消某個request的stream

其中涉及了兩個新的概念:

  • 數據流-stream:基於TCP鏈接之上的邏輯雙向字節流,用於承載雙向消息,對應一個請求及其響應。客戶端每發起一個請求就創建一個數據流,後續該請求及其響應的全部數據都經過該數據流傳輸。每一個數據流都有一個惟一的標識符和可選的優先級信息。
  • 幀-frame:HTTP/2的最小數據切片單位,承載着特定類型的數據,例如 HTTP 標頭、消息負載,等等。 來自不一樣數據流的幀能夠交錯發送,而後再根據每一個幀頭的數據流標識符從新組裝,從而在宏觀上實現了多個請求或響應並行傳輸的效果。
    http2.0

這裏的 多路複用機制 就實現了 在同一個TCP鏈接上 多個請求 並行執行。

不管是HTTP1.1的Keep-Alive機制仍是HTTP2.0的多路複用機制,在實現上都須要引入鏈接池來維護網絡鏈接。下面就開始分析 OkHttp中的鏈接池實現——鏈接攔截器ConnectInterceptor

ConnectInterceptor

鏈接攔截器ConnectInterceptor代碼以下:

//打開到目標服務的鏈接、處理下一個攔截器
public final class ConnectInterceptor implements Interceptor {
  public final OkHttpClient client;

  public ConnectInterceptor(OkHttpClient client) {
    this.client = client;
  }

  @Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    Transmitter transmitter = realChain.transmitter();

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    Exchange exchange = transmitter.newExchange(chain, doExtensiveHealthChecks);

    return realChain.proceed(request, transmitter, exchange);
  }
}
複製代碼

代碼不多,主要是使用transmitter.newExchange獲取Exchange實例,並做爲參數調用攔截器鏈的proceed的方法。注意到前面分析過的攔截器調用的proceed方法是一個參數的,而這裏是三個參數的。這是由於下一個攔截器(若是沒有配置網絡攔截器的話,就是CallServerInterceptor,也是最後一個)須要進行真正的網絡IO操做,而 Exchange(意爲交換)主要做用就是真正的IO操做:寫入請求、讀取響應(會在下一個攔截器作介紹)。

實際上獲取Exchange實例的邏輯處理都封裝在Transmitter中了。前面的文章提到過Transmitter,它是「發射器」,是把 請求 從應用端 發射到 網絡層,它持有請求的 鏈接、請求、響應 和 流,一個請求對應一個Transmitter實例,一個數據流。下面就看下它的newExchange方法:

Exchange newExchange(Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
    synchronized (connectionPool) {
      if (noMoreExchanges) {
        throw new IllegalStateException("released");
      }
      if (exchange != null) {
        throw new IllegalStateException("cannot make a new request because the previous response "
            + "is still open: please call response.close()");
      }
    }

    ExchangeCodec codec = exchangeFinder.find(client, chain, doExtensiveHealthChecks);
    Exchange result = new Exchange(this, call, eventListener, exchangeFinder, codec);

    synchronized (connectionPool) {
      this.exchange = result;
      this.exchangeRequestDone = false;
      this.exchangeResponseDone = false;
      return result;
    }
  }
複製代碼

如果第一次請求,前面兩個if是沒走進去的。接着看到使用exchangeFinder的find方法獲取到了ExchangeCodec實例,而後做爲參數構建了Exchange實例,並返回。嗯,看起來也很簡單的樣子。 注意到這個方法裏涉及了鏈接池RealConnectionPool、交換尋找器ExchangeFinder、交換編解碼ExchangeCodec、交換管理Exchange這幾個類(翻譯成這樣盡力了😊,意會吧)。

  • RealConnectionPool,鏈接池,負責管理請求的鏈接,包括新建、複用、關閉,理解上相似線程池。
  • ExchangeCodec,接口類,負責真正的IO操做—寫請求、讀響應,實現類有Http1ExchangeCodec、Http2ExchangeCodec,分別對應HTTP1.1協議、HTTP2.0協議。
  • Exchange,管理IO操做,能夠理解爲 數據流,是ExchangeCodec的包裝,增長了事件回調;一個請求對應一個Exchange實例。傳給下個攔截器CallServerInterceptor使用。
  • ExchangeFinder,(從鏈接池中)尋找可用TCP鏈接,而後經過鏈接獲得ExchangeCodec。

ExchangeFinder

ExchangeFinder的做用從名字就能夠看出——Exchange尋找者,本質是爲請求尋找一個TCP鏈接。若是已有可用鏈接就直接使用,沒有則打開一個新的鏈接。 一個網絡請求的執行,須要先有一個指向目標服務的TCP鏈接,而後再進行寫請求、讀響應的IO操做。ExchangeFinder是怎麼尋找的呢?繼續往下看~

咱們先看exchangeFinder初始化的地方:

public void prepareToConnect(Request request) {
    ...
    this.exchangeFinder = new ExchangeFinder(this, connectionPool, createAddress(request.url()),
        call, eventListener);
  }
複製代碼

看到這裏應該會想起上一篇文章中分析RetryAndFollowUpInterceptor時提到過,prepareToConnect這個方法做用是鏈接準備,就是建立了ExchangeFinder實例。主要到傳入的參數有connectionPool、createAddress方法返回的Address、call、eventListener。connectionPool是鏈接池,稍後分析,先看下createAddress方法:

private Address createAddress(HttpUrl url) {
    SSLSocketFactory sslSocketFactory = null;
    HostnameVerifier hostnameVerifier = null;
    CertificatePinner certificatePinner = null;
    if (url.isHttps()) {
      sslSocketFactory = client.sslSocketFactory();
      hostnameVerifier = client.hostnameVerifier();
      certificatePinner = client.certificatePinner();
    }

    return new Address(url.host(), url.port(), client.dns(), client.socketFactory(),
        sslSocketFactory, hostnameVerifier, certificatePinner, client.proxyAuthenticator(),
        client.proxy(), client.protocols(), client.connectionSpecs(), client.proxySelector());
  }
複製代碼

使用url和client配置 建立一個Address實例。Address意思是指向服務的鏈接的地址,能夠理解爲請求地址及其配置。Address有一個重要做用:相同Address的HTTP請求 共享 相同的鏈接。這能夠做爲前面提到的 HTTP1.1和HTTP2.0 複用鏈接 的請求的判斷。

回頭看exchangeFinder的find方法

public ExchangeCodec find( OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
    int connectTimeout = chain.connectTimeoutMillis();
    int readTimeout = chain.readTimeoutMillis();
    int writeTimeout = chain.writeTimeoutMillis();
    int pingIntervalMillis = client.pingIntervalMillis();
    boolean connectionRetryEnabled = client.retryOnConnectionFailure();

    try {
      //找到一個健康的鏈接
      RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
          writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
      //利用鏈接實例化ExchangeCodec對象,若是是HTTP/2返回Http2ExchangeCodec,不然返回Http1ExchangeCodec
      return resultConnection.newCodec(client, chain);
    } catch (RouteException e) {
      trackFailure();
      throw e;
    } catch (IOException e) {
      trackFailure();
      throw new RouteException(e);
    }
  }
複製代碼

主要就是經過findHealthyConnection方法獲取鏈接RealConnection實例,而後用RealConnection的newCodec方法獲取了ExchangeCodec實例,若是是HTTP/2返回Http2ExchangeCodec,不然返回Http1ExchangeCodec,而後返回。

findHealthyConnection方法名透露着 就是去尋找可用TCP鏈接的,而咱們猜想這個方法內部確定和鏈接池ConnectionPool有緊密的關係。接着跟進findHealthyConnection方法:

private RealConnection findHealthyConnection(int connectTimeout, int readTimeout, int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks) throws IOException {
    while (true) {
      //找鏈接
      RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
          pingIntervalMillis, connectionRetryEnabled);

      // 是新鏈接 且不是HTTP2.0 就不用體檢
      synchronized (connectionPool) {
        if (candidate.successCount == 0 && !candidate.isMultiplexed()) {
          return candidate;
        }
      }
      // 體檢不健康,繼續找
      if (!candidate.isHealthy(doExtensiveHealthChecks)) {
      	//標記不可用
        candidate.noNewExchanges();
        continue;
      }

      return candidate;
    }
  }
複製代碼

循環尋找鏈接:若是是不健康的鏈接,標記不可用(標記後會移除,後面講鏈接池會講到),而後繼續找。健康是指鏈接能夠承載新的數據流,socket是鏈接狀態。咱們跟進findConnection方法,看看究竟是怎麼 找鏈接 的:

//爲承載新的數據流 尋找 鏈接。尋找順序是 已分配的鏈接、鏈接池、新建鏈接
  private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
    boolean foundPooledConnection = false;
    RealConnection result = null;
    Route selectedRoute = null;
    RealConnection releasedConnection;
    Socket toClose;
    synchronized (connectionPool) {
      //請求已被取消(Call的cancel方法->transmitter的cancel方法),拋異常
      if (transmitter.isCanceled()) throw new IOException("Canceled");
      hasStreamFailure = false; 

      // 嘗試使用 已給數據流分配的鏈接.(例如重定向請求時,能夠複用上次請求的鏈接)
      releasedConnection = transmitter.connection;
      //有已分配的鏈接,但已經被限制承載新的數據流,就嘗試釋放掉(若是鏈接上已沒有數據流),並返回待關閉的socket。
      toClose = transmitter.connection != null && transmitter.connection.noNewExchanges
          ? transmitter.releaseConnectionNoEvents()
          : null;

      if (transmitter.connection != null) {
        // 不爲空,說明上面沒有釋放掉,那麼此鏈接可用
        result = transmitter.connection;
        releasedConnection = null;
      }

      if (result == null) {
        // 沒有已分配的可用鏈接,就嘗試從鏈接池獲取。(鏈接池稍後詳細講解)
        if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false)) {
          foundPooledConnection = true;
          result = transmitter.connection;
        } else if (nextRouteToTry != null) {
          selectedRoute = nextRouteToTry;//有可嘗試的路由
          nextRouteToTry = null;
        } else if (retryCurrentRoute()) {
          selectedRoute = transmitter.connection.route();
        }
      }
    }
    closeQuietly(toClose);//(若是有)關閉待關閉的socket

    if (releasedConnection != null) {
      eventListener.connectionReleased(call, releasedConnection);//(若是有)回調鏈接釋放事件
    }
    if (foundPooledConnection) {
      eventListener.connectionAcquired(call, result);//(若是有)回調(從鏈接池)獲取鏈接事件
    }
    if (result != null) {
      // 若是有已分配可用鏈接 或 從鏈接池獲取到鏈接,結束! 沒有 就走下面的新建鏈接過程。
      return result;
    }

    // 若是須要路由信息,就獲取。是阻塞操做
    boolean newRouteSelection = false;
    if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
      newRouteSelection = true;
      routeSelection = routeSelector.next();
    }

    List<Route> routes = null;
    synchronized (connectionPool) {
      if (transmitter.isCanceled()) throw new IOException("Canceled");

      if (newRouteSelection) {
        //如今有了IP地址,再次嘗試從鏈接池獲取。可能會由於鏈接合併而匹配。(這裏傳入了routes,上面的傳的null)
        routes = routeSelection.getAll();
        if (connectionPool.transmitterAcquirePooledConnection(
            address, transmitter, routes, false)) {
          foundPooledConnection = true;
          result = transmitter.connection;
        }
      }
	  //第二次鏈接池也沒找到,就新建鏈接
      if (!foundPooledConnection) {
        if (selectedRoute == null) {
          selectedRoute = routeSelection.next();
        }

        // Create a connection and assign it to this allocation immediately. This makes it possible
        // for an asynchronous cancel() to interrupt the handshake we're about to do.
        result = new RealConnection(connectionPool, selectedRoute);
        connectingConnection = result;
      }
    }

    // 若是第二次從鏈接池的嘗試成功了,結束,由於鏈接池中的鏈接是已經和服務器創建鏈接的
    if (foundPooledConnection) {
      eventListener.connectionAcquired(call, result);
      return result;
    }

    // 第二次沒成功,就把新建的鏈接,進行TCP + TLS 握手,與服務端創建鏈接. 是阻塞操做
    result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
        connectionRetryEnabled, call, eventListener);
    connectionPool.routeDatabase.connected(result.route());//從失敗名單中移除

    Socket socket = null;
    synchronized (connectionPool) {
      connectingConnection = null;
      // 最後一次嘗試從鏈接池獲取,注意最後一個參數爲true,即要求 多路複用(http2.0)
      //意思是,若是本次是http2.0,那麼爲了保證 多路複用性,(由於上面的握手操做不是線程安全)會再次確認鏈接池中此時是否已有一樣鏈接
      if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true)) {
        // 若是獲取到,就關閉咱們建立裏的鏈接,返回獲取的鏈接
        result.noNewExchanges = true;
        socket = result.socket();
        result = transmitter.connection;

        // 那麼這個剛剛鏈接成功的路由 就能夠 用做下次 嘗試的路由
        nextRouteToTry = selectedRoute;
      } else {
        //最後一次嘗試也沒有的話,就把剛剛新建的鏈接存入鏈接池
        connectionPool.put(result);
        transmitter.acquireConnectionNoEvents(result);//把鏈接賦給transmitter
      }
    }
    closeQuietly(socket);//若是剛剛創建的鏈接沒用到,就關閉

    eventListener.connectionAcquired(call, result);
    return result;
  }
複製代碼

代碼看着很長,已經加了註釋,方法目的就是 爲 承載新的數據流 尋找 鏈接。尋找順序是 已分配的鏈接、鏈接池、新建鏈接。梳理以下:

  1. 首先會嘗試使用 已給數據流分配的鏈接。(已分配鏈接的狀況例如重定向時的再次請求,說明上次已經有了鏈接)
  2. 若沒有 已分配的可用鏈接,就嘗試從鏈接池中 匹配獲取。由於此時沒有路由信息,因此匹配條件:address一致——host、port、代理等一致,且 匹配的鏈接能夠接受新的數據流。
  3. 若從鏈接池沒有獲取到,則取下一個代理的路由信息(多個Route,即多個IP地址),再次嘗試從鏈接池獲取,此時可能由於鏈接合併而匹配到。
  4. 若第二次也沒有獲取到,就建立RealConnection實例,進行TCP + TLS 握手,與服務端創建鏈接。
  5. 此時爲了確保Http2.0鏈接的多路複用性,會第三次從鏈接池匹配。由於新創建的鏈接的握手過程是非線程安全的,因此此時可能鏈接池新存入了相同的鏈接。
  6. 第三次若匹配到,就使用已有鏈接,釋放剛剛新建的鏈接;若未匹配到,則把新鏈接存入鏈接池並返回。

流程圖以下:

findConnection方法流程

看到這裏,小盆友,你是否有不少問號?

  • 開始如有了已分配的鏈接,但已經被限制承載新的數據流,是如何釋放的呢?
  • 代理路由信息是如何獲取的呢?
  • 如何從鏈接池獲取鏈接的?三次有什麼不一樣?

不要緊,慢慢來,咱們先看第二個問號,代理路由信息的獲取。

RouteSelector

先來看下Route類:

public final class Route {
  final Address address;
  final Proxy proxy;//代理
  final InetSocketAddress inetSocketAddress;//鏈接目標地址

  public Route(Address address, Proxy proxy, InetSocketAddress inetSocketAddress) {
    ...
    this.address = address;
    this.proxy = proxy;
    this.inetSocketAddress = inetSocketAddress;
  }
複製代碼

Route,經過代理服務器信息 proxy、鏈接目標地址 InetSocketAddress 來描述一條 鏈接服務器的具體路由

  • proxy代理:能夠爲客戶端顯式配置代理服務器。不然,將使用ProxySelector代理選擇器。可能會返回多個代理。
  • IP地址:不管是直連仍是代理,打開socket鏈接都須要IP地址。 DNS服務可能返回多個IP地址嘗試。

上面分析的findConnection方法中是使用routeSelection.getAll()獲取Route集合routes,而routeSelection是經過routeSelector.next()獲取,routeSelector是在ExchangeFinder的構造方法內建立的,也就是說routeSelector在RetryAndFollowUpInterceptor中就建立了,那麼咱們看下RouteSelector:

RouteSelector(Address address, RouteDatabase routeDatabase, Call call,
      EventListener eventListener) {
    this.address = address;
    this.routeDatabase = routeDatabase;//鏈接池中的路由黑名單(鏈接失敗的路由)
    this.call = call;
    this.eventListener = eventListener;

    resetNextProxy(address.url(), address.proxy());
  }
  //收集代理服務器
  private void resetNextProxy(HttpUrl url, Proxy proxy) {
    if (proxy != null) {
      // 若指定了代理,那麼就這一個。(就是初始化OkhttpClient時配置的)
      proxies = Collections.singletonList(proxy);
    } else {
      //沒配置就使用ProxySelector獲取代理(若初始化OkhttpClient時沒有配置ProxySelector,會使用系統默認的) 
      List<Proxy> proxiesOrNull = address.proxySelector().select(url.uri());
      proxies = proxiesOrNull != null && !proxiesOrNull.isEmpty()
          ? Util.immutableList(proxiesOrNull)
          : Util.immutableList(Proxy.NO_PROXY);
    }
    nextProxyIndex = 0;
  }
複製代碼

注意到RouteSelector的構造方法中傳入了routeDatabase,是鏈接失敗的路由黑名單(後面鏈接池也會講到),並使用resetNextProxy方法獲取代理服務器列表:若沒有指定proxy就是用ProxySelector獲取proxy列表(若沒有配置ProxySelector會使用系統默認)。接着看next方法:

//收集代理的路由信息
  public Selection next() throws IOException {
    if (!hasNext()) {//還有下一個代理
      throw new NoSuchElementException();
    }

    List<Route> routes = new ArrayList<>();
    while (hasNextProxy()) {
      Proxy proxy = nextProxy();
      //遍歷proxy經DNS後的全部IP地址,組裝成Route
      for (int i = 0, size = inetSocketAddresses.size(); i < size; i++) {
        Route route = new Route(address, proxy, inetSocketAddresses.get(i));
        if (routeDatabase.shouldPostpone(route)) {//此路由在黑名單中,存起來最後嘗試
          postponedRoutes.add(route);
        } else {
          routes.add(route);
        }
      }

      if (!routes.isEmpty()) {
        break;
      }
    }

    if (routes.isEmpty()) {
      // 若沒有拿到路由,就嘗試上面存的黑名單的路由
      routes.addAll(postponedRoutes);
      postponedRoutes.clear();
    }
    //routes包裝成Selection返回
    return new Selection(routes);
  }
複製代碼

next方法主要就是獲取下一個代理Proxy的代理信息,即多個路由。具體是在resetNextInetSocketAddress方法中實現,主要是對代理服務地址進行DNS解析獲取多個IP地址,這裏就不展開了,具體能夠參考OkHttp中的代理和路由

好了,到這裏 就解決了第二個問號。其餘兩個問號涉及 鏈接池RealConnectionPool、鏈接RealConnection,下面就來瞅瞅。

ConnectionPool

ConnectionPool,即鏈接池,用於管理http1.1/http2.0鏈接重用,以減小網絡延遲。相同Address的http請求能夠共享一個鏈接,ConnectionPool就是實現了鏈接的複用。

public final class ConnectionPool {
  final RealConnectionPool delegate;
  //最大空閒鏈接數5,最大空閒時間5分鐘
  public ConnectionPool() {
    this(5, 5, TimeUnit.MINUTES);
  }
  
  public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
    this.delegate = new RealConnectionPool(maxIdleConnections, keepAliveDuration, timeUnit);
  }
  //返回空閒鏈接數
  public int idleConnectionCount() {
    return delegate.idleConnectionCount();
  }
  //返回池子中的鏈接數
  public int connectionCount() {
    return delegate.connectionCount();
  }
  //關閉並移除因此空閒鏈接
  public void evictAll() {
    delegate.evictAll();
  }
}
複製代碼

ConnectionPool看起來比較好理解,默認配置是最大空閒鏈接數5,最大空閒時間5分鐘(即一個鏈接空閒時間超過5分鐘就移除),咱們也能夠在初始化okhttpClient時進行不一樣的配置。須要注意的是ConnectionPool是用於應用層,實際管理者是RealConnectionPool。RealConnectionPool是okhttp內部真實管理鏈接的地方。

鏈接池對鏈接的管理無非是 存、取、刪,上面的兩個問號分別對應 刪、取,跟進RealConnectionPool咱們一個個看:

private final Deque<RealConnection> connections = new ArrayDeque<>();
  
  private final Runnable cleanupRunnable = () -> {
    //循環清理
    while (true) {
      //清理
      long waitNanos = cleanup(System.nanoTime());
      if (waitNanos == -1) return;
      if (waitNanos > 0) {
        long waitMillis = waitNanos / 1000000L;
        waitNanos -= (waitMillis * 1000000L);
        synchronized (RealConnectionPool.this) {
          try {
            //下一次清理以前的等待
            RealConnectionPool.this.wait(waitMillis, (int) waitNanos);
          } catch (InterruptedException ignored) {
          }
        }
      }
    }
  };
  //存
  void put(RealConnection connection) {
    assert (Thread.holdsLock(this));
    if (!cleanupRunning) {
      cleanupRunning = true;
      executor.execute(cleanupRunnable);
    }
    connections.add(connection);
  }
複製代碼

connections是用於存鏈接的隊列Deque。看到在add以前 使用線程池executor執行了cleanupRunnable,意思是清理鏈接,爲啥要清理呢?上面提到過 鏈接池有 最大空閒鏈接數、最大空閒時間的限制,因此不知足時是要進行清理的。而且注意到清理是一個循環,而且下一次清理前要等待waitNanos時間,啥意思呢?咱們看下cleanup方法:

long cleanup(long now) {
    int inUseConnectionCount = 0;//正在使用的鏈接數
    int idleConnectionCount = 0;//空閒鏈接數
    RealConnection longestIdleConnection = null;//空閒時間最長的鏈接
    long longestIdleDurationNs = Long.MIN_VALUE;//最長的空閒時間

    //遍歷鏈接:找到待清理的鏈接, 找到下一次要清理的時間(還未到最大空閒時間)
    synchronized (this) {
      for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
        RealConnection connection = i.next();

        //若鏈接正在使用,continue,正在使用鏈接數+1
        if (pruneAndGetAllocationCount(connection, now) > 0) {
          inUseConnectionCount++;
          continue;
        }
		//空閒鏈接數+1
        idleConnectionCount++;

        // 賦值最長的空閒時間和對應鏈接
        long idleDurationNs = now - connection.idleAtNanos;
        if (idleDurationNs > longestIdleDurationNs) {
          longestIdleDurationNs = idleDurationNs;
          longestIdleConnection = connection;
        }
      }
	  //若最長的空閒時間大於5分鐘 或 空閒數 大於5,就移除並關閉這個鏈接
      if (longestIdleDurationNs >= this.keepAliveDurationNs
          || idleConnectionCount > this.maxIdleConnections) {
        connections.remove(longestIdleConnection);
      } else if (idleConnectionCount > 0) {
        // else,就返回 還剩多久到達5分鐘,而後wait這個時間再來清理
        return keepAliveDurationNs - longestIdleDurationNs;
      } else if (inUseConnectionCount > 0) {
        //鏈接沒有空閒的,就5分鐘後再嘗試清理.
        return keepAliveDurationNs;
      } else {
        // 沒有鏈接,不清理
        cleanupRunning = false;
        return -1;
      }
    }
	//關閉移除的鏈接
    closeQuietly(longestIdleConnection.socket());

    //關閉移除後 馬上 進行下一次的 嘗試清理
    return 0;
  }
複製代碼

思路仍是很清晰的:

  • 有空閒鏈接的話,若是最長的空閒時間大於5分鐘 或 空閒數 大於5,就移除關閉這個最長空閒鏈接;若是 空閒數 不大於5 且 最長的空閒時間不大於5分鐘,就返回到5分鐘的剩餘時間,而後等待這個時間再來清理。
  • 沒有空閒鏈接就等5分鐘後再嘗試清理。
  • 沒有鏈接不清理。

其中判斷鏈接正在使用的方法pruneAndGetAllocationCount咱們來看下:

private int pruneAndGetAllocationCount(RealConnection connection, long now) {
    //鏈接上的數據流,弱引用列表
    List<Reference<Transmitter>> references = connection.transmitters;
    for (int i = 0; i < references.size(); ) {
      Reference<Transmitter> reference = references.get(i);
      if (reference.get() != null) {
        i++;
        continue;
      }

      // 到這裏,transmitter是泄漏的,要移除,且此鏈接不能再承載新的數據流(泄漏的緣由就是下面的message)
      TransmitterReference transmitterRef = (TransmitterReference) reference;
      String message = "A connection to " + connection.route().address().url()
          + " was leaked. Did you forget to close a response body?";
      Platform.get().logCloseableLeak(message, transmitterRef.callStackTrace);
      references.remove(i);
      connection.noNewExchanges = true;

      //鏈接由於泄漏沒有數據流了,那麼能夠當即移除了。因此設置 開始空閒時間 是5分鐘前(厲害厲害!)
      if (references.isEmpty()) {
        connection.idleAtNanos = now - keepAliveDurationNs;
        return 0;
      }
    }
    //返回鏈接上的數據流數量,大於0說明正在使用。
    return references.size();
  }
複製代碼

邏輯註釋已經標明瞭,很好理解。其中connection.transmitters,表示在此鏈接上的數據流,transmitters size大於1即表示多個請求複用此鏈接。

另外,在findConnection中,使用connectionPool.put(result)存鏈接後,又調用transmitter.acquireConnectionNoEvents方法,瞅下:

void acquireConnectionNoEvents(RealConnection connection) {
    assert (Thread.holdsLock(connectionPool));
    if (this.connection != null) throw new IllegalStateException();
    this.connection = connection;
    connection.transmitters.add(new TransmitterReference(this, callStackTrace));
  }
複製代碼

先把鏈接賦給transmitter,表示數據流transmitter依附在這個connection上;而後connection.transmitters add 這個transmitter的弱引用,connection.transmitters表示這個鏈接承載的全部數據流,即承載的全部請求。

好了,存 講完了,主要就是把鏈接存入隊列,同時開始循環嘗試清理過時鏈接。

//爲transmitter 從鏈接池 獲取 對應address的鏈接。若果routes不爲空,可能會由於 鏈接合併(複用) 而獲取到HTTP/2鏈接。
  boolean transmitterAcquirePooledConnection(Address address, Transmitter transmitter, @Nullable List<Route> routes, boolean requireMultiplexed) {
    assert (Thread.holdsLock(this));
    for (RealConnection connection : connections) {
      if (requireMultiplexed && !connection.isMultiplexed()) continue;
      if (!connection.isEligible(address, routes)) continue;
      transmitter.acquireConnectionNoEvents(connection);
      return true;
    }
    return false;
  }
複製代碼

存的方法名是put,但你發現 取 的方法名卻不是get,transmitterAcquirePooledConnection意思是 爲transmitter 從鏈接池 獲取鏈接,實際上transmitter就表明一個數據流,也就是一個http請求。注意到,在遍歷中 通過判斷後也是transmitter的acquireConnectionNoEvents方法,即把匹配到的connection賦給transmitter。因此方法名仍是很生動的。

繼續看是如何匹配的:若是requireMultiplexed爲false,即不是多路複用(不是http/2),那麼就要看Connection的isEligible方法了,isEligible方法返回true,就表明匹配成功:

//用於判斷 鏈接 是否 能夠承載指向address的數據流
  boolean isEligible(Address address, @Nullable List<Route> routes) {
    //鏈接再也不接受新的數據流,false
    if (transmitters.size() >= allocationLimit || noNewExchanges) return false;

    //匹配address中非host的部分
    if (!Internal.instance.equalsNonHost(this.route.address(), address)) return false;

    //匹配address的host,到這裏也匹配的話,就return true
    if (address.url().host().equals(this.route().address().url().host())) {
      return true; // This connection is a perfect match.
    }

    //到這裏hostname是沒匹配的,可是仍是有機會返回true:鏈接合併
    // 1. 鏈接須是 HTTP/2.
    if (http2Connection == null) return false;

    // 2. IP 地址匹配
    if (routes == null || !routeMatchesAny(routes)) return false;

    // 3. 證書匹配
    if (address.hostnameVerifier() != OkHostnameVerifier.INSTANCE) return false;
    if (!supportsUrl(address.url())) return false;

    // 4. 證書 pinning 匹配.
    try {
      address.certificatePinner().check(address.url().host(), handshake().peerCertificates());
    } catch (SSLPeerUnverifiedException e) {
      return false;
    }

    return true; // The caller's address can be carried by this connection.
  }

  private boolean routeMatchesAny(List<Route> candidates) {
    for (int i = 0, size = candidates.size(); i < size; i++) {
      Route candidate = candidates.get(i);
      if (candidate.proxy().type() == Proxy.Type.DIRECT
          && route.proxy().type() == Proxy.Type.DIRECT
          && route.socketAddress().equals(candidate.socketAddress())) {
        return true;
      }
    }
    return false;
  }
複製代碼

取的過程就是 遍歷鏈接池,進行地址等一系列匹配,到這裏第三個問號也解決了。

//移除關閉空閒鏈接
  public void evictAll() {
    List<RealConnection> evictedConnections = new ArrayList<>();
    synchronized (this) {
      for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
        RealConnection connection = i.next();
        if (connection.transmitters.isEmpty()) {
          connection.noNewExchanges = true;
          evictedConnections.add(connection);
          i.remove();
        }
      }
    }

    for (RealConnection connection : evictedConnections) {
      closeQuietly(connection.socket());
    }
  }
複製代碼

遍歷鏈接池,若是鏈接上的數據流是空,那麼就從鏈接池移除而且關閉。

咱們回過頭看下Transmitter的releaseConnectionNoEvents方法,也就第一個問號,若是鏈接再也不接受新的數據流,就會調用這個方法:

//從鏈接上移除transmitter.
  @Nullable Socket releaseConnectionNoEvents() {
    assert (Thread.holdsLock(connectionPool));

    int index = -1;
    //遍歷 此數據流依附的鏈接 上的全部數據流,找到index
    for (int i = 0, size = this.connection.transmitters.size(); i < size; i++) {
      Reference<Transmitter> reference = this.connection.transmitters.get(i);
      if (reference.get() == this) {
        index = i;
        break;
      }
    }

    if (index == -1) throw new IllegalStateException();
	//transmitters移除此數據流
    RealConnection released = this.connection;
    released.transmitters.remove(index);
    this.connection = null;
	//若是鏈接上沒有有數據流了,就置爲空閒(等待清理),並返回待關閉的socket
    if (released.transmitters.isEmpty()) {
      released.idleAtNanos = System.nanoTime();
      if (connectionPool.connectionBecameIdle(released)) {
        return released.socket();
      }
    }

    return null;
  }
複製代碼

主要就是嘗試釋放鏈接,鏈接上沒有數據流就關閉socket等待被清理。

好了,到這裏鏈接池的管理就分析完了。

從鏈接的查找 到 鏈接池的管理,就是ConnectInterceptor的內容了。

CallServerInterceptor

哎呀,終於到最後一個攔截器了!

請求服務攔截器,也就是真正地去進行網絡IO讀寫了——寫入http請求的header和body數據、讀取響應的header和body。

上面ConnectInterceptor主要介紹瞭如何 尋找鏈接 以及 鏈接池如何管理鏈接。在獲取到鏈接後,調用了RealConnection的newCodec方法ExchangeCodec實例,而後使用ExchangeCodec實例建立了Exchange實例傳入CallServerInterceptor了。上面提到過ExchangeCodec負責請求和響應的IO讀寫,咱們先來看看ExchangeCodec建立過程——RealConnection的newCodec方法:

ExchangeCodec newCodec(OkHttpClient client, Interceptor.Chain chain) throws SocketException {
    if (http2Connection != null) {
      return new Http2ExchangeCodec(client, this, chain, http2Connection);
    } else {
      socket.setSoTimeout(chain.readTimeoutMillis());
      source.timeout().timeout(chain.readTimeoutMillis(), MILLISECONDS);
      sink.timeout().timeout(chain.writeTimeoutMillis(), MILLISECONDS);
      return new Http1ExchangeCodec(client, this, source, sink);
    }
  }
複製代碼

http2Connection不爲空就建立Http2ExchangeCodec,不然是Http1ExchangeCodec。而http2Connection的建立是鏈接進行TCP、TLS握手的時候,即在RealConnection的connect方法中,具體就是connect方法中調用的establishProtocol方法:

private void establishProtocol(ConnectionSpecSelector connectionSpecSelector, int pingIntervalMillis, Call call, EventListener eventListener) throws IOException {
    //針對http請求,若是配置的協議包含Protocol.H2_PRIOR_KNOWLEDGE,則開啓Http2鏈接
    if (route.address().sslSocketFactory() == null) {
      if (route.address().protocols().contains(Protocol.H2_PRIOR_KNOWLEDGE)) {
        socket = rawSocket;
        protocol = Protocol.H2_PRIOR_KNOWLEDGE;
        startHttp2(pingIntervalMillis);
        return;
      }

      socket = rawSocket;
      protocol = Protocol.HTTP_1_1;
      return;
    }
	//針對https請求,會在TLS握手後,根據平臺獲取協議(),若是協議是Protocol.HTTP_2,則開啓Http2鏈接
    eventListener.secureConnectStart(call);
    connectTls(connectionSpecSelector);
    eventListener.secureConnectEnd(call, handshake);

    if (protocol == Protocol.HTTP_2) {
      startHttp2(pingIntervalMillis);
    }
  }

  private void startHttp2(int pingIntervalMillis) throws IOException {
    socket.setSoTimeout(0); // HTTP/2 connection timeouts are set per-stream.
    http2Connection = new Http2Connection.Builder(true)
        .socket(socket, route.address().url().host(), source, sink)
        .listener(this)
        .pingIntervalMillis(pingIntervalMillis)
        .build();
    http2Connection.start();
  }
複製代碼

好了,到這裏再也不深刻了,繼續瞭解能夠參考HTTP 2.0與OkHttp。那麼到這裏,ExchangeCodec已經建立了,而後又包裝成Exchange,最後傳入了CallServerInterceptor。

下面就來看看這最後一個攔截器:

public final class CallServerInterceptor implements Interceptor {
  private final boolean forWebSocket;

  public CallServerInterceptor(boolean forWebSocket) {
    this.forWebSocket = forWebSocket;
  }

  @Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Exchange exchange = realChain.exchange();//上個攔截器傳入的exchange
    Request request = realChain.request();

    long sentRequestMillis = System.currentTimeMillis();
	//寫請求頭
    exchange.writeRequestHeaders(request);

    boolean responseHeadersStarted = false;
    Response.Builder responseBuilder = null;
    //含body的請求
    if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
      // 若請求頭包含 "Expect: 100-continue" , 就會等服務端返回含有 "HTTP/1.1 100 Continue"的響應,而後再發送請求body. 
      //若是沒有收到這個響應(例如收到的響應是4xx),那就不發送body了。
      if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
        exchange.flushRequest();
        responseHeadersStarted = true;
        exchange.responseHeadersStart();
        responseBuilder = exchange.readResponseHeaders(true);
      }
	  //responseBuilder爲null說明服務端返回了100,也就是能夠繼續發送body了
      if (responseBuilder == null) {
        if (request.body().isDuplex()) {//默認是false不會進入
          // Prepare a duplex body so that the application can send a request body later.
          exchange.flushRequest();
          BufferedSink bufferedRequestBody = Okio.buffer(
              exchange.createRequestBody(request, true));
          request.body().writeTo(bufferedRequestBody);
        } else {
          // 知足了 "Expect: 100-continue" ,寫請求body
          BufferedSink bufferedRequestBody = Okio.buffer(
              exchange.createRequestBody(request, false));
          request.body().writeTo(bufferedRequestBody);
          bufferedRequestBody.close();
        }
      } else {
       //沒有知足 "Expect: 100-continue" ,請求發送結束
        exchange.noRequestBody();
        if (!exchange.connection().isMultiplexed()) {
          // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
          // from being reused. Otherwise we're still obligated to transmit the request body to
          // leave the connection in a consistent state.
          exchange.noNewExchangesOnConnection();
        }
      }
    } else {
     //沒有body,請求發送結束
      exchange.noRequestBody();
    }

	//請求發送結束
    if (request.body() == null || !request.body().isDuplex()) {
      exchange.finishRequest();
    }
	//回調 讀響應頭開始事件(若是上面沒有)
    if (!responseHeadersStarted) {
      exchange.responseHeadersStart();
    }
	//讀響應頭(若是上面沒有)
    if (responseBuilder == null) {
      responseBuilder = exchange.readResponseHeaders(false);
    }
	//構建response
    Response response = responseBuilder
        .request(request)
        .handshake(exchange.connection().handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();

    int code = response.code();
    if (code == 100) {
      //這裏服務端又返回了個100,就再嘗試獲取真正的響應()
      response = exchange.readResponseHeaders(false)
          .request(request)
          .handshake(exchange.connection().handshake())
          .sentRequestAtMillis(sentRequestMillis)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build();

      code = response.code();
    }
	//回調讀響應頭結束
    exchange.responseHeadersEnd(response);
	//這裏就是獲取響應body了
    if (forWebSocket && code == 101) {
      // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
      response = response.newBuilder()
          .body(Util.EMPTY_RESPONSE)
          .build();
    } else {
      response = response.newBuilder()
          .body(exchange.openResponseBody(response))
          .build();
    }
	//請求頭中Connection是close,表示請求完成後要關閉鏈接
    if ("close".equalsIgnoreCase(response.request().header("Connection"))
        || "close".equalsIgnoreCase(response.header("Connection"))) {
      exchange.noNewExchangesOnConnection();
    }
	//204(無內容)、205(充值內容),body應該是空
    if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
      throw new ProtocolException(
          "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
    }
	
    return response;
  }
}
複製代碼

你會發現,整個內容就是前面說的一句話:寫入http請求的header和body、讀取響應的header和body。這裏就再也不解釋了。

這裏咱們能夠看到,不管寫請求仍是讀響應,都是使用Exchange對應的方法。上面也提到過Exchange理解上是對ExchangeCodec的包裝,這寫方法內部除了事件回調和一些參數獲取外,核心工做都由 ExchangeCodec 對象完成,而 ExchangeCodec實際上利用的是 Okio,而 Okio 實際上仍是用的 Socket。

ExchangeCodec的實現類 有對應Http1.1的Http1ExchangeCodec 和 對應Http2.0的Http2ExchangeCodec。其中Http2ExchangeCodec是使用Http2.0中 數據幀 的概念完成請求響應的讀寫。關於Http1ExchangeCodec、Http2ExchangeCodec具體實現原理涉及okio這再也不展開。

最後一點,CallServerInterceptor的intercept方法中沒有調用鏈接器鏈Chain的proceed方法,由於這是最後一個攔截器啦!

好了,到這裏最後一個攔截器也分析完啦!

總結

本篇分析了ConnectInterceptor、CallServerInterceptor兩個攔截器的做用和原理。ConnectInterceptor負責鏈接的獲取,其中涉及到鏈接池的概念;CallServerInterceptor是真正的網絡IO讀寫。ConnectInterceptor涉及的內容較多,它是Okhttp的核心。 結合上一篇,咱們已經分析完了Okhttp內部全部的攔截器,最後給出Okhttp的總體架構圖(圖片來源):

okhttp

到這裏,Okhttp源碼解析部分就真的結束了,可真是一個漫長的過程! 從使用方式到工做流程,再到具體攔截器,掌握了這四篇文章的內容,應該說得上對Okhttp是比較熟悉了。這裏還計劃會出第五篇終章,來介紹一些Okhttp的常見問題和高級使用方式,敬請期待!


感謝與參考:

okhttp源碼解析

經過ConnectInterceptor源碼掌握OKHttp3網絡鏈接原理 嘔心瀝血第十彈【十】

OkHttp源碼深度解析

OkHttp源碼解析 (三)——代理和路由

OkHttp 源碼學習筆記(三) 數據交換的流 HTTPCodec

最後的最後,歡迎留言討論,若是你喜歡這一系列,或者以爲寫得還不錯,請幫忙 點贊、收藏和轉發,感謝

歡迎關注個人 公 衆 號

公衆號:胡飛洋
相關文章
相關標籤/搜索