OKHTTP3源碼和設計模式(下篇)


總體架構

《OKHTTP3源碼和設計模式(上篇)》,中總體介紹了 OKHttp3 的源碼架構,重點講解了請求任務的分發管理和線程池以及請求執行過程當中的攔截器。這一章咱們接着往下走認識一下 OKHttp3 底層鏈接和鏈接池工做機制。設計模式

RealCall 封裝了請求過程, 組織了用戶和內置攔截器,其中內置攔截器 retryAndFollowUpInterceptor -> BridgeInterceptor -> CacheInterceptor 完執行層的大部分邏輯 ,ConnectInterceptor -> CallServerInterceptor 兩個攔截器開始邁向鏈接層最終完成網絡請求。瀏覽器

鏈接層鏈接器

進入鏈接層

ConnectInterceptor 的工做很簡單, 負責打開鏈接; CallServerIntercerceptor 是核心鏈接器鏈上的最後一個鏈接器,
負責從當前鏈接中寫入和讀取數據。緩存

鏈接的打開

/** Opens a connection to the target server and proceeds to the next interceptor. */
        // 打開一個和目標服務器的鏈接,並把處理交個下一個攔截器
  public final class ConnectInterceptor implements Interceptor {
   public final OkHttpClient client;

   public ConnectInterceptor(OkHttpClient client) {
    this.client = client;
  }

  @Override public 
  Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    StreamAllocation streamAllocation = realChain.streamAllocation();

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
            // 打開鏈接
    HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
    RealConnection connection = streamAllocation.connection();
     // 交個下一個攔截器
   return realChain.proceed(request, streamAllocation, httpCodec, connection);
  }
}
複製代碼

單獨看 ConnectInterceptor 的代碼很簡單,不過鏈接正在打開的過程須要看看 streamAllocation.newStream(client, doExtensiveHealthChecks),內部執行過程。仍是先總體上了看看 StreamAllocation 這個類的做用。bash

StreamAllocation

StreamAllocation 處於上層請求和底層鏈接池直接 , 協調請求和鏈接池直接的關係。先來看看 StreamAllocation 對象在哪裏建立的? 回到以前文章中介紹的 RetryAndFollowUpInterceptor, 這是核心攔截器鏈上的頂層攔截器其中源碼:服務器

@Override 
    public Response intercept(Chain chain) throws IOException {
             Request request = chain.request();
       streamAllocation = new StreamAllocation(
       client.connectionPool(), createAddress(request.url()), callStackTrace);
      ...省略代碼
  }
複製代碼

這裏, 每一次請求建立了一個 StreamAllocation 對象, 那麼問題來了? 以前咱們說過每個 OkHttpClient 對象只有一個對應的鏈接池, 剛剛又說到 StreamAllocation 打開鏈接, 那麼 StreamAllocation 是如何建立鏈接池的呢?咱們很容易就去 StreamAllocation 中找鏈接池建立的邏輯,可是找不到。 鏈接池建立的地方在 OkHttpClient 中:cookie

public Builder() {
      dispatcher = new Dispatcher();
      protocols = DEFAULT_PROTOCOLS;
      connectionSpecs = DEFAULT_CONNECTION_SPECS;
      eventListenerFactory = EventListener.factory(EventListener.NONE);
      proxySelector = ProxySelector.getDefault();
      cookieJar = CookieJar.NO_COOKIES;
      socketFactory = SocketFactory.getDefault();
      hostnameVerifier = OkHostnameVerifier.INSTANCE;
      certificatePinner = CertificatePinner.DEFAULT;
      proxyAuthenticator = Authenticator.NONE;
      authenticator = Authenticator.NONE;
                // 建立鏈接池
      connectionPool = new ConnectionPool();
      dns = Dns.SYSTEM;
      followSslRedirects = true;
      followRedirects = true;
      retryOnConnectionFailure = true;
      connectTimeout = 10_000;
      readTimeout = 10_000;
      writeTimeout = 10_000;
      pingInterval = 0;
}
複製代碼

OkHttpClient 默認構造函數的 Builder , 在這裏建立了鏈接池。因此這裏咱們也能夠看到, 若是咱們對默認鏈接池不滿,咱們是能夠直經過 builder 接指定的。
搞懂了 StreamAllocation 和 ConnectionPool 的建立 , 咱們再來看看 StreamAllocation 是怎麼打開鏈接的?直接兜源碼可能有點繞 ,先給一個粗略流程圖,而後逐點分析。網絡

鏈接建立請求流程

連接池實現

相信你們都有一些 Http 協議的基礎(若是沒有就去補了,否則看不懂)都知道 Http 的下層協議是 TCP。TCP 鏈接的建立和斷開是有性能開銷的,在 Http1.0 中,每一次請求就打開一個鏈接,在一些老的舊的瀏覽器上,若是仍是基於 Http1.0,體驗會很是差; Http1.1 之後支持長鏈接, 運行一個請求打開鏈接完成請求後, 鏈接能夠不關閉, 下次請求時複用此鏈接,從而提升鏈接的利用率。固然並非鏈接打開後一直開着不關,這樣又會形成鏈接浪費,怎麼管理?
在OKHttp3 的默認實現中,使用一個雙向隊列來緩存全部鏈接, 這些鏈接中最多隻能存在 5 個空閒鏈接,空閒鏈接最多隻能存活 5 分鐘。
鏈接架構

按期清理實現

public final class ConnectionPool {
    /**
    * Background threads are used to cleanup expired connections. There will be at most a single
    * thread running per connection pool. The thread pool executor permits the pool itself to be
    * garbage collected.
   */
        // 後臺按期清理鏈接的線程池
       private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
      Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
       new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));

      /** The maximum number of idle connections for each address. */
      private final int maxIdleConnections;
     private final long keepAliveDurationNs;
          // 後臺按期清理鏈接的任務
     private final Runnable cleanupRunnable = new Runnable() {
    @Override 
        public void run() {
      while (true) {
                   // cleanup 執行清理
        long waitNanos = cleanup(System.nanoTime());
        if (waitNanos == -1) return;
        if (waitNanos > 0) {
          long waitMillis = waitNanos / 1000000L;
          waitNanos -= (waitMillis * 1000000L);
          synchronized (ConnectionPool.this) {
            try {
              ConnectionPool.this.wait(waitMillis, (int) waitNanos);
            } catch (InterruptedException ignored) {
            }
          }
        }
      }
    }
  };
複製代碼

雙向隊列

// 存儲鏈接的雙向隊列
private final Deque<RealConnection> connections = new ArrayDeque<>();
複製代碼

放入鏈接

void put(RealConnection connection) {
    assert (Thread.holdsLock(this));
    if (!cleanupRunning) {
      cleanupRunning = true;
      executor.execute(cleanupRunnable);
   }
   connections.add(connection);
 }
複製代碼

獲取鏈接

RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
    assert (Thread.holdsLock(this));
    for (RealConnection connection : connections) {
      if (connection.isEligible(address, route)) {
        streamAllocation.acquire(connection);
        return connection;
      }
    }
    return null;
}
複製代碼

StreamAllocation.鏈接建立和複用

ConnectionPool 的源碼邏輯仍是至關比較簡單, 主要提供一個雙向列表來存取鏈接, 使用一個定時任務按期清理無用鏈接。 二鏈接的建立和複用邏輯主要在 StreamAllocation 中。socket

尋找鏈接

private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
  int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
  throws IOException {
while (true) {
       //  核心邏輯在 findConnection()中
  RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
      connectionRetryEnabled);

  // If this is a brand new connection, we can skip the extensive health checks.
  synchronized (connectionPool) {
    if (candidate.successCount == 0) {
      return candidate;
    }
  }

  // Do a (potentially slow) check to confirm that the pooled connection is still good. If it
  // isn't, take it out of the pool and start again. if (!candidate.isHealthy(doExtensiveHealthChecks)) { noNewStreams(); continue; } return candidate; } } 複製代碼

findConnection():ide

private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
  boolean connectionRetryEnabled) throws IOException {
Route selectedRoute;
synchronized (connectionPool) {
  // 省略部分代碼...
  // Attempt to get a connection from the pool. Internal.instance 就是 ConnectionPool 的實例
  Internal.instance.get(connectionPool, address, this, null);
  if (connection != null) {
           // 複用此鏈接
    return connection;
  }
          // 省略部分代碼...
            // 建立新新鏈接
  result = new RealConnection(connectionPool, selectedRoute);
        // 引用計數
  acquire(result);
}

synchronized (connectionPool) {
  // Pool the connection. 放入鏈接池
  Internal.instance.put(connectionPool, result);
 }
  // 省略部分代碼...
 return result;
}       
複製代碼

StreamAllocation 主要是爲上層提供一個鏈接, 若是鏈接池中有複用的鏈接則複用鏈接, 若是沒有則建立新的。不管是拿到可複用的仍是建立新的, 都要爲此鏈接計算一下引用計數。

public void acquire(RealConnection connection) {
 assert (Thread.holdsLock(connectionPool));
 if (this.connection != null) throw new IllegalStateException();

 this.connection = connection;
     //  鏈接使用allocations列表來記錄每個引用
 connection.allocations.add(new StreamAllocationReference(this, callStackTrace));
}
複製代碼

Realconnection

Realconnection 封裝了底層 socket 鏈接, 同時使用 OKio 來進行數據讀寫, OKio 是 square 公司的另外一個獨立的開源項目, 你們感興趣能夠去深刻讀下 OKio 源碼, 這裏不展開。

鏈接

/** Does all the work necessary to build a full HTTP or HTTPS connection on a raw socket. */
  private void connectSocket(int connectTimeout, int readTimeout) throws IOException {
    Proxy proxy = route.proxy();
    Address address = route.address();

    rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
    ? address.socketFactory().createSocket()
    : new Socket(proxy);

    rawSocket.setSoTimeout(readTimeout);
    try {
              // 打開 socket 鏈接
      Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
    } catch (ConnectException e) {
      ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
      ce.initCause(e);
      throw ce;
    }
           // 使用 OKil 連上 socket 後續讀寫使用 Okio
    source = Okio.buffer(Okio.source(rawSocket));
    sink = Okio.buffer(Okio.sink(rawSocket));
 }複製代碼
相關文章
相關標籤/搜索