OkHttpClient源碼分析(五)—— ConnectInterceptor和CallServerInterceptor

上一篇咱們介紹了緩存攔截器CacheInterceptor,本篇將介紹剩下的兩個攔截器: ConnectInterceptorCallServerInterceptor緩存

ConnectInterceptor

該攔截器主要是負責創建可用的連接,主要做用是打開了與服務器的連接,正式開啓了網絡請求。 查看其intercept()方法:bash

@Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    //從攔截器鏈中獲取StreamAllocation對象
    Request request = realChain.request();
    StreamAllocation streamAllocation = realChain.streamAllocation();
    
    //建立HttpCodec對象
    HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
    
    //獲取realConnetion
    RealConnection connection = streamAllocation.connection();

    //執行下一個攔截器,返回response
    return realChain.proceed(request, streamAllocation, httpCodec, connection);
  }
複製代碼

能夠看到intercept中的處理很簡單,主要有如下幾步操做:服務器

  1. 從攔截器鏈中獲取StreamAllocation對象,在講解第一個攔截器RetryAndFollowUpInterceptor的時候,咱們已經初步瞭解了StreamAllocation對象,在RetryAndFollowUpInterceptor中僅僅只是建立了StreamAllocation對象,並無進行使用,到了ConnectInterceptor中,StreamAllocation才被真正使用到,該攔截器的主要功能都交給了StreamAllocation處理;網絡

  2. 執行StreamAllocation對象的 newStream() 方法建立HttpCodec,用於處理編碼Request和解碼Response;socket

  3. 接着經過調用StreamAllocation對象的 connection() 方法獲取到RealConnection對象,這個RealConnection對象是用來進行實際的網絡IO傳輸的。ide

  4. 調用攔截器鏈的**proceed()**方法,執行下一個攔截器返回response對象。源碼分析

上面咱們已經瞭解了ConnectInterceptor攔截器的intercept()方法的總體流程,主要的邏輯是在StreamAllocation對象中,咱們先看下它的 newStream() 方法:ui

public HttpCodec newStream(
      OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
    ...
    try {
      //建立RealConnection對象
      RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
          writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);
      //建立HttpCodec對象
      HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);
      
      synchronized (connectionPool) {
        codec = resultCodec;
        //返回HttpCodec對象
        return resultCodec;
      }
    } catch (IOException e) {
      throw new RouteException(e);
    }
  }
複製代碼

newStream()方法中,主要是建立了RealConnection對象(用於進行實際的網絡IO傳輸)和HttpCodec對象(用於處理編碼Request和解碼Response),並將HttpCodec對象返回。this

findHealthyConnection()方法用於建立RealConnection對象:編碼

private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
      int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
      throws IOException {
    while (true) {//while循環
      //獲取RealConnection對象
      RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
          connectionRetryEnabled);
    
      //同步代碼塊判斷RealConnection對象的successCount是否爲0
      synchronized (connectionPool) {
        if (candidate.successCount == 0) {
          //若是爲0則返回
          return candidate;
        }
      }

      //對連接池中不健康的連接作銷燬處理
      if (!candidate.isHealthy(doExtensiveHealthChecks)) {
        noNewStreams();
        continue;
      }

      return candidate;
    }
  }
複製代碼

以上代碼主要作的事情有:

  1. 開啓一個while循環,經過調用findConnection()方法獲取RealConnection對象賦值給candidate;
  2. 若是candidate 的successCount 爲0,直接返回candidate,while循環結束;
  3. 調用candidate的isHealthy()方法,進行「健康檢查」,若是candidate是一個不「健康」的對象,其中不「健康」指的是Socket沒有關閉、或者它的輸入輸出流沒有關閉,則對調用noNewStreams()方法進行銷燬處理,接着繼續循環。

咱們看下findConnection()方法作了哪些操做:

private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
      boolean connectionRetryEnabled) throws IOException {
    ...
    RealConnection result = null;
    ...
    synchronized (connectionPool) {
      ...
      releasedConnection = this.connection;
      toClose = releaseIfNoNewStreams();
      if (this.connection != null) {
        //若是不爲 null,則複用,賦值給 result
        result = this.connection;
        releasedConnection = null;
      }
      ...
      //若是result爲 null,說明上面找不到能夠複用的
      if (result == null) {
        //從鏈接池中獲取,調用其get()方法
        Internal.instance.get(connectionPool, address, this, null);
        if (connection != null) {
          //找到對應的 RealConnection對象
          //更改標誌位,賦值給 result
          foundPooledConnection = true;
          result = connection;
        } else {
          selectedRoute = route;
        }
      }
    }
    
    ...
    if (result != null) {
      //已經找到 RealConnection對象,直接返回
      return result;
    }
    
    ...
     //鏈接池中找不到,new一個
     result = new RealConnection(connectionPool, selectedRoute);
    ...
    
    ...
    //發起請求
    result.connect(
        connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled, call, eventListener);
    ...
    //存進鏈接池中,調用其put()方法
    Internal.instance.put(connectionPool, result);
    ...
    return result;
  }
複製代碼

以上代碼主要作的事情有:

  1. StreamAllocation的connection若是能夠複用則複用;
  2. 若是connection不能複用,則從鏈接池中獲取RealConnection對象,獲取成功則返回;
  3. 若是鏈接池裏沒有,則new一個RealConnection對象;
  4. 調用RealConnection的connect()方法發起請求;
  5. 將RealConnection對象存進鏈接池中,以便下次複用;
  6. 返回RealConnection對象。

ConnectionPool 鏈接池介紹

剛纔咱們說到從鏈接池中取出RealConnection對象時調用了Internal的get()方法,存進去的時候調用了其put()方法。其中Internal是一個抽象類,裏面定義了一個靜態變量instance:

public abstract class Internal {
    ...
    public static Internal instance;
    ...
}
複製代碼

instance的實例化是在OkHttpClient的靜態代碼塊中:

public class OkHttpClient implements Cloneable, Call.Factory, WebSocket.Factory {
  ...
  static {
      Internal.instance = new Internal() {
         ...
          @Override public RealConnection get(ConnectionPool pool, Address address,
          StreamAllocation streamAllocation, Route route) {
            return pool.get(address, streamAllocation, route);
         }
         ...
         @Override public void put(ConnectionPool pool, RealConnection connection) {
           pool.put(connection);
         }
      };
  }
  ...
}
複製代碼

這裏咱們能夠看到實際上 Internal 的 get()方法和put()方法是調用了 ConnectionPool 的get()方法和put()方法,這裏咱們簡單看下ConnectionPool的這兩個方法:

private final Deque<RealConnection> connections = new ArrayDeque<>();

@Nullable RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
    assert (Thread.holdsLock(this));
    for (RealConnection connection : connections) {
      if (connection.isEligible(address, route)) {
        streamAllocation.acquire(connection, true);
        return connection;
      }
    }
    return null;
  }
複製代碼

在get()方法中,經過遍歷connections(用於存放RealConnection的ArrayDeque隊列),調用RealConnection的isEligible()方法判斷其是否可用,若是可用就會調用streamAllocation的acquire()方法,並返回connection。

咱們看下調用StreamAllocation的acquire()方法到底作了什麼操做:

public void acquire(RealConnection connection, boolean reportedAcquired) {
    assert (Thread.holdsLock(connectionPool));
    if (this.connection != null) throw new IllegalStateException();

    //賦值給全局變量
    this.connection = connection;
    this.reportedAcquired = reportedAcquired;
    //建立StreamAllocationReference對象並添加到allocations集合中
    connection.allocations.add(new StreamAllocationReference(this, callStackTrace));
  }
複製代碼
  1. 先是從鏈接池中獲取的RealConnection對象賦值給StreamAllocation的成員變量connection;

  2. 建立StreamAllocationReference對象(StreamAllocation對象的弱引用), 並添加到RealConnection的allocations集合中,到時能夠經過allocations集合的大小來判斷網絡鏈接次數是否超過OkHttp指定的鏈接次數。

接着咱們查看ConnectionPool 的put()方法:

void put(RealConnection connection) {
    assert (Thread.holdsLock(this));
    if (!cleanupRunning) {
      cleanupRunning = true;
      executor.execute(cleanupRunnable);
    }
    connections.add(connection);
  }
複製代碼

put()方法在將鏈接添加到鏈接池以前,會先執行清理任務,經過判斷cleanupRunning是否在執行,若是當前清理任務沒有執行,則更改cleanupRunning標識,並執行清理任務cleanupRunnable。

咱們看下清理任務cleanupRunnable中到底作了哪些操做:

private final Runnable cleanupRunnable = new Runnable() {
    @Override public void run() {
      while (true) {
        //對鏈接池進行清理,返回進行下次清理的間隔時間。
        long waitNanos = cleanup(System.nanoTime());
        if (waitNanos == -1) return;
        if (waitNanos > 0) {
          long waitMillis = waitNanos / 1000000L;
          waitNanos -= (waitMillis * 1000000L);
          synchronized (ConnectionPool.this) {
            try {
              //進行等待
              ConnectionPool.this.wait(waitMillis, (int) waitNanos);
            } catch (InterruptedException ignored) {
            }
          }
        }
      }
    }
  };
複製代碼

能夠看到run()方法裏面是一個while死循環,其中調用了cleanup()方法進行清理操做,同時會返回進行下次清理的間隔時間,若是返回的時間間隔爲-1,則會結束循環,若是不是-1,則會調用wait()方法進行等待,等待完成後又會繼續循環執行,具體的清理操做在cleanup()方法中:

long cleanup(long now) {
    //正在使用的鏈接數
    int inUseConnectionCount = 0;
    //空閒的鏈接數
    int idleConnectionCount = 0;
    //空閒時間最長的鏈接
    RealConnection longestIdleConnection = null;
    //最大的空閒時間,初始化爲 Long 的最小值,用於記錄全部空閒鏈接中空閒最久的時間
    long longestIdleDurationNs = Long.MIN_VALUE;

    synchronized (this) {
      //for循環遍歷connections隊列
      for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
        RealConnection connection = i.next();

        //若是遍歷到的鏈接正在使用,則跳過,continue繼續遍歷下一個
        if (pruneAndGetAllocationCount(connection, now) > 0) {
          inUseConnectionCount++;
          continue;
        }

        //當前鏈接處於空閒,空閒鏈接數++
        idleConnectionCount++;

        //計算空閒時間
        long idleDurationNs = now - connection.idleAtNanos;
        //空閒時間若是超過最大空閒時間
        if (idleDurationNs > longestIdleDurationNs) {
          //從新賦值最大空閒時間
          longestIdleDurationNs = idleDurationNs;
          //賦值空閒最久的鏈接
          longestIdleConnection = connection;
        }
      }

      if (longestIdleDurationNs >= this.keepAliveDurationNs
          || idleConnectionCount > this.maxIdleConnections) {
        //若是最大空閒時間超過空閒保活時間或空閒鏈接數超過最大空閒鏈接數限制
        //則移除該鏈接
        connections.remove(longestIdleConnection);
      } else if (idleConnectionCount > 0) {
        //若是存在空閒鏈接
        //計算出線程清理的時間即(保活時間-最大空閒時間),並返回
        return keepAliveDurationNs - longestIdleDurationNs;
      } else if (inUseConnectionCount > 0) {
         //沒有空閒鏈接,返回keepAliveDurationNs
        return keepAliveDurationNs;
      } else {
        //鏈接池中沒有鏈接存在,返回-1
        cleanupRunning = false;
        return -1;
      }
    }

    //關閉空閒時間最長的鏈接
    closeQuietly(longestIdleConnection.socket());

    return 0;
  }
複製代碼

cleanup()方法經過for循環遍歷connections隊列,記錄最大空閒時間和空閒時間最長的鏈接;若是存在超過空閒保活時間或空閒鏈接數超過最大空閒鏈接數限制的鏈接,則從connections中移除,最後執行關閉該鏈接的操做。

主要是經過pruneAndGetAllocationCount()方法判斷鏈接是否處於空閒狀態:

private int pruneAndGetAllocationCount(RealConnection connection, long now) {
    List<Reference<StreamAllocation>> references = connection.allocations;
    for (int i = 0; i < references.size(); ) {
      Reference<StreamAllocation> reference = references.get(i);

      if (reference.get() != null) {
        i++;
        continue;
      }

      ...
      
      references.remove(i);
      connection.noNewStreams = true;
      
      ...
      
      if (references.isEmpty()) {
        connection.idleAtNanos = now - keepAliveDurationNs;
        return 0;
      }
    }

    return references.size();
  }
複製代碼

該方法經過for循環遍歷RealConnection的allocations集合,若是當前遍歷到的StreamAllocation被使用就遍歷下一個,不然就將其移除,若是移除後列表爲空,則返回0,因此若是方法的返回值爲0則說明當前鏈接處於空閒狀態,若是返回值大於0則說明鏈接正在使用。

CallServerInterceptor

接下來說解最後一個攔截器CallServerInterceptor了,查看intercept()方法:

@Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    //相關對象的獲取 
    HttpCodec httpCodec = realChain.httpStream();
    StreamAllocation streamAllocation = realChain.streamAllocation();
    RealConnection connection = (RealConnection) realChain.connection();
    Request request = realChain.request();

    ...
    
    //寫入請求頭
    httpCodec.writeRequestHeaders(request);

    Response.Builder responseBuilder = null;
    if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
      //判斷是否有請求體
      if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
        //詢問服務器是否願意接收請求體
        httpCodec.flushRequest();//刷新請求
        realChain.eventListener().responseHeadersStart(realChain.call());
        responseBuilder = httpCodec.readResponseHeaders(true);
      }

      if (responseBuilder == null) {
        //服務器願意接收請求體
        //寫入請求體
        ...
      } else if (!connection.isMultiplexed()) {
        streamAllocation.noNewStreams();
      }
    }

    //結束請求
    httpCodec.finishRequest();

    if (responseBuilder == null) {
      realChain.eventListener().responseHeadersStart(realChain.call());
      //根據服務器返回的數據構建 responseBuilder對象
      responseBuilder = httpCodec.readResponseHeaders(false);
    }

    //構建 response對象
    Response response = responseBuilder
        .request(request)
        .handshake(streamAllocation.connection().handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();

    ...
    
    //設置 response的 body
    response = response.newBuilder()
          .body(httpCodec.openResponseBody(response))
          .build();
   
   //若是請求頭中 Connection對應的值爲 close,則關閉鏈接
    if ("close".equalsIgnoreCase(response.request().header("Connection"))
        || "close".equalsIgnoreCase(response.header("Connection"))) {
      streamAllocation.noNewStreams();
    }
    
    ...
    
    return response;
  }
複製代碼

以上代碼具體的流程:

  1. 從攔截器鏈中獲取到保存的相關對象;
  2. 調用HttpCodec的writeRequestHeaders()方法寫入請求頭;
  3. 判斷是否須要寫入請求體,先是判斷請求方法,若是知足,請求頭經過攜帶特殊字段Expect: 100-continue來詢問服務器是否願意接收請求體;
  4. 結束請求;
  5. 根據服務器返回的數據構建response對象;
  6. 關閉鏈接;
  7. 返回response;

  好了,到這裏OkHttpClient源碼分析就結束了,相信看完本套源碼解析會加深你對OkHttpClient的認識,同時也學到了其巧妙的代碼設計思路,在閱讀源碼的過程當中,咱們的編碼能力也逐步提高,若是想要寫更加優質的代碼,閱讀源碼是一件頗有幫助的事。

相關文章
相關標籤/搜索