Android OkHttp源碼解析

使用

這裏不詳細講解如何使用,若是須要詳細瞭解使用,請參考個人另外一篇文章Android OkHttp3.0 基本使用 html

咱們看一下基本使用java

HttpLoggingInterceptor logInterceptor = new HttpLoggingInterceptor(new HttpLogger());
        logInterceptor.setLevel(HttpLoggingInterceptor.Level.BODY);
        
        mOkHttpClient = new OkHttpClient.Builder()
                .connectTimeout(60, TimeUnit.SECONDS)
                .writeTimeout(60, TimeUnit.SECONDS)
                .readTimeout(60, TimeUnit.SECONDS)
                .addNetworkInterceptor(logInterceptor)
                .build();
        Request request = new Request.Builder()
                .url(url)
                .get()
                .build();
        Call call = mOkHttpClient.newCall(request);
        call.enqueue(new Callback() {
            @Override
            public void onFailure(Call call, IOException e) {
            }

            @Override
            public void onResponse(Call call, Response response) throws IOException {
               
            }
        });
複製代碼

下面咱們慢慢分析web

OkHttpClient.Builder

首先咱們經過Builder構建了一個OkHttpClient,Builder是OkHttpClient的一個內部類,其實就是一個Build模式,咱們看下Builder類是幹什麼的設計模式

public static final class Builder {
    Dispatcher dispatcher;
    Proxy proxy;
    List<Protocol> protocols;
    List<ConnectionSpec> connectionSpecs;
    final List<Interceptor> interceptors = new ArrayList<>();
    final List<Interceptor> networkInterceptors = new ArrayList<>();
    EventListener.Factory eventListenerFactory;
    ProxySelector proxySelector;
    CookieJar cookieJar;
    Cache cache;
    InternalCache internalCache;
    SocketFactory socketFactory;
    SSLSocketFactory sslSocketFactory;
    CertificateChainCleaner certificateChainCleaner;
    HostnameVerifier hostnameVerifier;
    CertificatePinner certificatePinner;
    Authenticator proxyAuthenticator;
    Authenticator authenticator;
    ConnectionPool connectionPool;
    Dns dns;
    boolean followSslRedirects;
    boolean followRedirects;
    boolean retryOnConnectionFailure;
    int connectTimeout;
    int readTimeout;
    int writeTimeout;
    int pingInterval;

    public Builder() {
      dispatcher = new Dispatcher();
      protocols = DEFAULT_PROTOCOLS;
      connectionSpecs = DEFAULT_CONNECTION_SPECS;
      eventListenerFactory = EventListener.factory(EventListener.NONE);
      proxySelector = ProxySelector.getDefault();
      cookieJar = CookieJar.NO_COOKIES;
      socketFactory = SocketFactory.getDefault();
      hostnameVerifier = OkHostnameVerifier.INSTANCE;
      certificatePinner = CertificatePinner.DEFAULT;
      proxyAuthenticator = Authenticator.NONE;
      authenticator = Authenticator.NONE;
      connectionPool = new ConnectionPool();
      dns = Dns.SYSTEM;
      followSslRedirects = true;
      followRedirects = true;
      retryOnConnectionFailure = true;
      connectTimeout = 10_000;
      readTimeout = 10_000;
      writeTimeout = 10_000;
      pingInterval = 0;
    }
    
    
  public OkHttpClient build() {
      return new OkHttpClient(this);
    }
複製代碼

Builder其實就是一個初始化參數的載體,好比你設置的超時時間,攔截器等,而後經過build方法把this賦值給OkHttpClient,也就是把這些初始化的參數交給了OkHttpClient緩存

Request

而後構建一個Request服務器

public final class Request {
  final HttpUrl url;
  final String method;
  final Headers headers;
  final RequestBody body;
  final Object tag;

  private volatile CacheControl cacheControl; // Lazily initialized.

  Request(Builder builder) {
    this.url = builder.url;
    this.method = builder.method;
    this.headers = builder.headers.build();
    this.body = builder.body;
    this.tag = builder.tag != null ? builder.tag : this;
  }
複製代碼

其實Request也是經過內部類Builder構建的,這也是Build模式,他的做用也是一個參數的載體,好比url,method,headers,bodycookie

Call

最後經過Call call = mOkHttpClient.newCall(request); 獲取Call對象網絡

** OkHttpClient.java
  
  @Override 
  public Call newCall(Request request) {
    return new RealCall(this, request, false /* for web socket */);
  }
複製代碼

內部是建立了一個RealCall對象,咱們看一下RealCall就是實際發出請求的對象,他有倆個方法一個是同步請求,一個異步請求異步

同步請求socket

try {
            Response execute = call.execute();
        } catch (IOException e) {
            e.printStackTrace();
        }
複製代碼

咱們看一下源碼

** RealCall.java
 @Override public Response execute() throws IOException {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    try {
      client.dispatcher().executed(this);
      Response result = getResponseWithInterceptorChain();
      if (result == null) throw new IOException("Canceled");
      return result;
    } finally {
      client.dispatcher().finished(this);
    }
  
複製代碼

這裏作了四件事

  • 判斷這個Call是否已經被執行過,每一個Call只能被執行一次,若是須要一個徹底同樣的Call,能夠用Call.clone()方法克隆
  • 調用client.dispatcher().executed(this)方法,在同步中,其實這個方法並無太大做用,只是把Call加入到了一個集合中,而後方便以後的取消,和獲取正在運行的Call
  • 而後調用Response result = getResponseWithInterceptorChain();獲取Http返回的結果,這個是真正進行請求的方法
  • 最後調用client.dispatcher().finished(this);方法通知Call已經執行完畢

異步請求

call.enqueue(new Callback() {
            @Override
            public void onFailure(Call call, IOException e) {
            }

            @Override
            public void onResponse(Call call, Response response) throws IOException {
               
            }
        });
複製代碼

咱們看一下異步請求的源碼

**RealCall.java
 @Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }
複製代碼
  • 首先仍是先判斷了是否已經執行過了
  • 而後調用了client.dispatcher().enqueue(new AsyncCall(responseCallback));去執行請求

咱們先看一下AsyncCall

final class AsyncCall extends NamedRunnable {
    ...
    @Override protected void execute() {
    ...
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
          ...
    }
  }
  
 public abstract class NamedRunnable implements Runnable {
  protected final String name;
  ...
  @Override public final void run() {
    String oldName = Thread.currentThread().getName();
    Thread.currentThread().setName(name);
    try {
      execute();
    } finally {
      Thread.currentThread().setName(oldName);
    }
  }

  protected abstract void execute();
}
複製代碼

咱們能夠看到AsyncCall其實就是一個Runnable,當執行這個Runnable的時候對調用execute方法,而execute方法中仍是調用了Response response = getResponseWithInterceptorChain();來進行真正的請求

咱們再來分析一下dispatcher

public final class Dispatcher {
  //最大的請求數
  private int maxRequests = 64;
  //每臺主機的最大請求數
  private int maxRequestsPerHost = 5;
  private Runnable idleCallback;

  //線程池
  private ExecutorService executorService;

  //準備運行的異步Call
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

  //正在運行的異步Call
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

  //正在運行的同步Call
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

  public Dispatcher(ExecutorService executorService) {
    this.executorService = executorService;
  }

  //異步
  synchronized void enqueue(AsyncCall call) {
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else {
      readyAsyncCalls.add(call);
    }
  }

//同步
 synchronized void executed(RealCall call) {
    runningSyncCalls.add(call);
  }

複製代碼

咱們能夠看到他其實維護了一個線程池,和一些存儲請求的隊列,同步請求是是直接把Call加入到隊列中,異步請求是,看當前正在請求的數量來判斷加到那個隊列中,若是當前運行的請求,小於最大請求數,就用線程池運行這個Call

攔截器

最終咱們發現,無論是同步請求仍是異步請求,最終都是調用了getResponseWithInterceptorChain()方法,進行的請求,咱們來分析一下這個方法

Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    interceptors.addAll(client.interceptors());
    interceptors.add(retryAndFollowUpInterceptor);
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    interceptors.add(new CacheInterceptor(client.internalCache()));
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());
    }
    interceptors.add(new CallServerInterceptor(forWebSocket));

    Interceptor.Chain chain = new RealInterceptorChain(
        interceptors, null, null, null, 0, originalRequest);
    return chain.proceed(originalRequest);
  }
複製代碼

interceptorOkHttp中很重要的一部分,採用了責任鏈的設計模式,他把網絡請求,緩存,重連,等工能統一了起來,每個功能都是一個Interceptor,他們在經過Interceptor.Chain環環相扣,最終完成一次網絡請求

咱們看一下他們究竟是怎麼環環相扣的

首先上方,把全部的攔截器放入到了一個集合中,構建了第一個攔截器RealInterceptorChain,看他內部作了什麼

public final class RealInterceptorChain implements Interceptor.Chain .... public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec, RealConnection connection) throws IOException {
   ...

    // Call the next interceptor in the chain.
    //構建下一個RealInterceptorChain,注意參數 index + 1
    RealInterceptorChain next = new RealInterceptorChain(
        interceptors, streamAllocation, httpCodec, connection, index + 1, request);
        //獲取第一個攔截器,並調用第一個攔截器的intercept方法
    Interceptor interceptor = interceptors.get(index);
    Response response = interceptor.intercept(next);

   ...

    return response;
  }
複製代碼

上方是刪減代碼,咱們看一下,從新構建RealInterceptor,注意參數index + 1,而後獲取第一個攔截器,而後調用了interceptor.intercept方法,咱們看一下其餘的interceptor的intercept方法作了什麼,第一個攔截器就是RetryAndFollowUpInterceptor

** RetryAndFollowUpInterceptor.java
  
  @Override public Response intercept(Chain chain) throws IOException {
   
   ...
        response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
        releaseConnection = false;
   ...
  }
複製代碼

這是最關鍵的代碼,他除了走本身的邏輯以外,他還調用 ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);chain是上方傳入的next,而後index+1,會讓他執行下一個攔截器,如此循環

大概介紹一下每個攔截器具體作了什麼

  • client.interceptors()用戶自定義的攔截器
  • RetryAndFollowUpInterceptor負責重連和重定向
  • BridgeInterceptor負責把用戶的請求轉換爲發送到服務器的請求,把服務器的響應轉換爲用戶友好的響應
  • CacheInterceptor負責緩存管理
  • ConnectInterceptor負責和服務器創建鏈接
  • client.networkInterceptors用戶自定義攔截器,僅在網絡請求時有用
  • CallServerInterceptor負責向服務器發送請求,從服務器讀取響應

咱們今天只分析CacheInterceptorConnectInterceptor

CacheInterceptor

@Override 
 public Response intercept(Chain chain) throws IOException {
    //根據Request獲取緩存的Response
    Response cacheCandidate = cache != null
        ? cache.get(chain.request())
        : null;

    long now = System.currentTimeMillis();
    //根據傳入參數,判斷緩存的策略,是否使用了網絡或者緩存,或者倆者都使用
    CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
    Request networkRequest = strategy.networkRequest;
    Response cacheResponse = strategy.cacheResponse;
    ...

    //請求不使用網絡也不使用緩存,則直接返回code=504
    if (networkRequest == null && cacheResponse == null) {
      return new Response.Builder()
          .request(chain.request())
          .protocol(Protocol.HTTP_1_1)
          .code(504)
          .message("Unsatisfiable Request (only-if-cached)")
          .body(Util.EMPTY_RESPONSE)
          .sentRequestAtMillis(-1L)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build();
    }

    // 若是不須要使用網絡,則直接把緩存返回
    if (networkRequest == null) {
      return cacheResponse.newBuilder()
          .cacheResponse(stripBody(cacheResponse))
          .build();
    }

    Response networkResponse = null;
    try {
      //交給下一級去請求網絡
      networkResponse = chain.proceed(networkRequest);
    } finally {
      // If we're crashing on I/O or otherwise, don't leak the cache body.
      if (networkResponse == null && cacheCandidate != null) {
        closeQuietly(cacheCandidate.body());
      }
    }

    // If we have a cache response too, then we're doing a conditional get.
    if (cacheResponse != null) {
        //若是服務器返回的是304則 返回緩存結果
      if (networkResponse.code() == HTTP_NOT_MODIFIED) {
        Response response = cacheResponse.newBuilder()
            .headers(combine(cacheResponse.headers(), networkResponse.headers()))
            .sentRequestAtMillis(networkResponse.sentRequestAtMillis())
            .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
            .cacheResponse(stripBody(cacheResponse))
            .networkResponse(stripBody(networkResponse))
            .build();
        networkResponse.body().close();

        // Update the cache after combining headers but before stripping the
        // Content-Encoding header (as performed by initContentStream()).
        cache.trackConditionalCacheHit();
        cache.update(cacheResponse, response);
        return response;
      } else {
        closeQuietly(cacheResponse.body());
      }
    }
    
    Response response = networkResponse.newBuilder()
        .cacheResponse(stripBody(cacheResponse))
        .networkResponse(stripBody(networkResponse))
        .build();
    //若是不是304,須要緩存,就把請求結果放入緩存中
    if (cache != null) {
      if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
        // 更新緩存
        CacheRequest cacheRequest = cache.put(response);
        return cacheWritingResponse(cacheRequest, response);
      }

      if (HttpMethod.invalidatesCache(networkRequest.method())) {
        try {
          cache.remove(networkRequest);
        } catch (IOException ignored) {
          // The cache cannot be written.
        }
      }
    }

    return response;
  }
複製代碼
  • 首先若是網絡和緩存都不用,直接返回504
  • 而後若是不使用不使用網絡,則返回緩存中數據
  • 上邊都不知足的話,交給下一級去請求網絡
  • 若是服務器返回的是304而且有緩存,則返回緩存數據
  • 若是上面都不知足,則返回請求結果,若是須要緩存則緩存數據
  • 緩存數據是用的DiskLruCache

ConnectInterceptor

這個攔截器只是打開了一個連接

@Override 
 public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    StreamAllocation streamAllocation = realChain.streamAllocation();

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
    RealConnection connection = streamAllocation.connection();

    return realChain.proceed(request, streamAllocation, httpCodec, connection);
  }
複製代碼

這段代碼很短,主要工做是,建立了一個HttpCodecRealConnectionHttpCodec的主要做用是編碼請求和解碼響應,RealConnection是用來向服務器發起連接,這裏面最重要就是獲取連接的時候用到了鏈接池ConnectionPool來複用連接

streamAllocation.newStream方法中,通過一系列調用,最終會調用到StreamAllocationfindConnection方法

private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout, boolean connectionRetryEnabled) throws IOException {
    Route selectedRoute;
    synchronized (connectionPool) {
    ...

      // Attempt to use an already-allocated connection.
      //嘗試用已分配的連接,若是連接容許建立新的流則返回連接,經過noNewStreams來判斷是否容許建立新的流
      RealConnection allocatedConnection = this.connection;
      if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
        return allocatedConnection;
      }

      // Attempt to get a connection from the pool.
      //嘗試從連接池中獲取一個可用的連接
      Internal.instance.get(connectionPool, address, this, null);
      if (connection != null) {
        return connection;
      }

      selectedRoute = route;
    }

    // If we need a route, make one. This is a blocking operation.
    if (selectedRoute == null) {
      selectedRoute = routeSelector.next();
    }

    RealConnection result;
    synchronized (connectionPool) {
      if (canceled) throw new IOException("Canceled");

      // Now that we have an IP address, make another attempt at getting a connection from the pool.
      // This could match due to connection coalescing.
      //再次嘗試在鏈接池中獲取
      Internal.instance.get(connectionPool, address, this, selectedRoute);
      if (connection != null) return connection;

      // Create a connection and assign it to this allocation immediately. This makes it possible
      // for an asynchronous cancel() to interrupt the handshake we're about to do.
      route = selectedRoute;
      refusedStreamCount = 0;
      //從新創建一個連接
      result = new RealConnection(connectionPool, selectedRoute);
      acquire(result);
    }

    // Do TCP + TLS handshakes. This is a blocking operation.
    //這裏進行TCP和TLS的握手
    result.connect(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled);
    routeDatabase().connected(result.route());

    Socket socket = null;
    synchronized (connectionPool) {
      // Pool the connection.
      //把新建的連接放入鏈接池
      Internal.instance.put(connectionPool, result);

      // If another multiplexed connection to the same address was created concurrently, then
      // release this connection and acquire that one.
      if (result.isMultiplexed()) {
        socket = Internal.instance.deduplicate(connectionPool, address, this);
        result = connection;
      }
    }
    closeQuietly(socket);

    return result;
  }
複製代碼

上方的邏輯仍是很清楚的

  • 首先看當前連接是否可用,可用則返回
  • 而後從鏈接池中找可用連接,若是有則返回
  • 再次從鏈接池尋找可用鏈接,若是又則返回
  • 從新建立一個連接
  • 進行TCP和TLS握手
  • 把新建的鏈接放入鏈接池,而後返回

ConnectionPool 鏈接池

public final class ConnectionPool {

  private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
      Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
      new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));

  /** The maximum number of idle connections for each address. */
  private final int maxIdleConnections;
  private final long keepAliveDurationNs;
  //後臺線程用於清理過時連接
  private final Runnable cleanupRunnable = new Runnable() {
    @Override public void run() {
      while (true) {
        long waitNanos = cleanup(System.nanoTime());
        if (waitNanos == -1) return;
        if (waitNanos > 0) {
          long waitMillis = waitNanos / 1000000L;
          waitNanos -= (waitMillis * 1000000L);
          synchronized (ConnectionPool.this) {
            try {
              ConnectionPool.this.wait(waitMillis, (int) waitNanos);
            } catch (InterruptedException ignored) {
            }
          }
        }
      }
    }
  };
  //存儲連接的隊列
  private final Deque<RealConnection> connections = new ArrayDeque<>();
  final RouteDatabase routeDatabase = new RouteDatabase();
  boolean cleanupRunning;
  //取出連接
  RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
    assert (Thread.holdsLock(this));
    for (RealConnection connection : connections) {
      if (connection.isEligible(address, route)) {
        streamAllocation.acquire(connection);
        return connection;
      }
    }
    return null;
  }
//放入連接
void put(RealConnection connection) {
    assert (Thread.holdsLock(this));
    if (!cleanupRunning) {
      cleanupRunning = true;
      executor.execute(cleanupRunnable);
    }
    connections.add(connection);
  }
  
複製代碼

其實鏈接池是維護一個隊列儲存連接,每次放入連接還會觸發後臺線程清除失效連接,每次取出都會使用connection.isEligible校驗該連接是否能夠複用,其實鏈接池就是省去了創建鏈接和握手時間

如何進行連接

若是鏈接池沒有合適連接就會從新建立連接並握手, result.connect(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled); 咱們看一下這個方法到底作了什麼

public void connect( int connectTimeout, int readTimeout, int writeTimeout, boolean connectionRetryEnabled) {
    ....
    while (true) {
      try {
        if (route.requiresTunnel()) {
          connectTunnel(connectTimeout, readTimeout, writeTimeout);
        } else {
          connectSocket(connectTimeout, readTimeout);
        }
        //跟TLS握手
        establishProtocol(connectionSpecSelector);
        break;
        ...
    }
    
    
 private void connectSocket(int connectTimeout, int readTimeout) throws IOException {
    Proxy proxy = route.proxy();
    Address address = route.address();

    rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
        ? address.socketFactory().createSocket()
        : new Socket(proxy);

    rawSocket.setSoTimeout(readTimeout);
    try {
      Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
    } catch (ConnectException e) {
      ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
      ce.initCause(e);
      throw ce;
    }
    source = Okio.buffer(Okio.source(rawSocket));
    sink = Okio.buffer(Okio.sink(rawSocket));
  }

  public void connectSocket(Socket socket, InetSocketAddress address, int connectTimeout) throws IOException {
    socket.connect(address, connectTimeout);
  }
複製代碼

它內部分爲是否須要隧道,不須要創建隧道則,則調用connectSocket方法,最終是調用了socket.connect,創建完socket連接以後,須要進行TLS握手,TLS我就不介紹了,我也不熟悉,參考SSL/TLS 握手過程詳解

參考:juejin.im/post/58c603…

juejin.im/post/5bc89f…

blog.piasy.com/2016/07/11/…

相關文章
相關標籤/搜索