OkHttpClient源碼分析(一)—— 同步、異步請求分析和Dispatcher的任務調度

OkHttpClient同步請求的執行流程和源碼分析

同步請求示例

OkHttpClient okHttpClient = new OkHttpClient.Builder().readTimeout(5, TimeUnit.SECONDS).build();
        Request request = new Request.Builder().url("https://www.baidu.com").get().build();
        Call call = okHttpClient.newCall(request);
        try {
            Response response = call.execute();
            Log.e(TAG,"response: " + response.body().string());
        } catch (IOException e) {
            e.printStackTrace();
        }
複製代碼

同步請求的步驟

  1. 建立OkHttpClient對象和Request對象,均是採用Builder模式建立,構建者(Builder)設計模式(又叫生成器設計模式)web

  2. 將Request封裝成Call對象設計模式

  3. 調用Call的execute()方法發送同步請求,發送請求後,就會進入阻塞狀態,直到收到響應。緩存

1、(1)OkHttpClient Builder對象分析

public Builder() {
      dispatcher = new Dispatcher();
      protocols = DEFAULT_PROTOCOLS;
      connectionSpecs = DEFAULT_CONNECTION_SPECS;
      eventListenerFactory = EventListener.factory(EventListener.NONE);
      proxySelector = ProxySelector.getDefault();
      cookieJar = CookieJar.NO_COOKIES;
      socketFactory = SocketFactory.getDefault();
      hostnameVerifier = OkHostnameVerifier.INSTANCE;
      certificatePinner = CertificatePinner.DEFAULT;
      proxyAuthenticator = Authenticator.NONE;
      authenticator = Authenticator.NONE;
      connectionPool = new ConnectionPool();
      dns = Dns.SYSTEM;
      followSslRedirects = true;
      followRedirects = true;
      retryOnConnectionFailure = true;
      connectTimeout = 10_000;
      readTimeout = 10_000;
      writeTimeout = 10_000;
      pingInterval = 0;
}
複製代碼

  OkHttpClient Builder的構造函數,主要是對一些參數賦值默認值,對一些對象進行初始化,Dispatcher是OkHttpClient中http請求的分發器,由它來決定異步請求是直接處理仍是進行緩存等待,對於同步請求,它並無作太多操做,只是把同步請求放到隊列當中去執行。ConnectionPool是一個鏈接池對象,用於管理鏈接對象,當存在一樣的Url請求時,能夠複用,從鏈接池中找到對應緩存的鏈接對象。bash

(2)Request 對象分析

public Builder() {
      this.method = "GET";
      this.headers = new Headers.Builder();
}
複製代碼

Request Builder的構造函數,默認請求方法爲GET,同時初始化一個Header對象。cookie

public Request build() {
      if (url == null) throw new IllegalStateException("url == null");
      return new Request(this);
}
複製代碼

build()方法是建立Request對象,將當前的builder對象傳入,接下來看Request的構造函數:異步

Request(Builder builder) {
    this.url = builder.url;
    this.method = builder.method;
    this.headers = builder.headers.build();
    this.body = builder.body;
    this.tags = Util.immutableMap(builder.tags);
  }
複製代碼

  能夠看到,是將傳入的Builder對象中的屬性賦值給Request的相關屬性,這樣就建立好了Request對象。socket

2、建立Call 對象分析

/**
   * Prepares the {@code request} to be executed at some point in the future.
   */
  @Override public Call newCall(Request request) {
    return RealCall.newRealCall(this, request, false /* for web socket */);
  }
複製代碼

  OkHttpClient 對象中的newCall()方法,返回值是一個Call對象(接口),在這裏能夠看到實際上調用的RealCall.newRealCall()方法建立,RealCall是Call接口的一個實現類,接着查看RealCall類中newRealCall()方法:async

static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
    // Safely publish the Call instance to the EventListener.
    RealCall call = new RealCall(client, originalRequest, forWebSocket);
    call.eventListener = client.eventListenerFactory().create(call);
    return call;
  }
複製代碼

  能夠看到newRealCall()方法中建立了Call接口的實現類RealCall對象並返回該對象,到此Call對象的建立便完成了。ide

3、Call 對象exexcute()方法分析

  上面有說起到Call對象是一個接口,咱們點擊查看exexcute()方法時,須要點擊查看該方法的實現,其實是進入到RealCall對象的exexcute()方法:函數

@Override public Response execute() throws IOException {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    eventListener.callStart(this);
    try {
      client.dispatcher().executed(this);
      Response result = getResponseWithInterceptorChain();
      if (result == null) throw new IOException("Canceled");
      return result;
    } catch (IOException e) {
      eventListener.callFailed(this, e);
      throw e;
    } finally {
      client.dispatcher().finished(this);
    }
  }
複製代碼

  在同步代碼中,先經過判斷executed標識,若是當前已經有在執行,則會拋出"Already Executed"信息的異常,若是沒有執行過,則更改executed標識爲true。

  接着調用captureCallStackTrace()方法,這個方法主要用於捕捉一些http請求的異常堆棧信息。

  eventListener.callStart(this)開啓事件監聽,經過查看該方法:

/**
   * Invoked as soon as a call is enqueued or executed by a client. In case of thread or stream
   * limits, this call may be executed well before processing the request is able to begin.
   *
   * <p>This will be invoked only once for a single {@link Call}. Retries of different routes
   * or redirects will be handled within the boundaries of a single callStart and {@link
   * #callEnd}/{@link #callFailed} pair.
   */
  public void callStart(Call call) {
  }
複製代碼

  經過閱讀該方法的註釋,能夠知道該方法會在調用Call對象的enqueue()或execute()方法的時候,就會開啓這個listener。

接下來分析一下這個方法中的核心代碼:

client.dispatcher().executed(this);
複製代碼

首先調用OkHttpClient的dispatcher()方法

public Dispatcher dispatcher() {
    return dispatcher;
}
複製代碼

該方法返回一個Dispatcher對象,緊接着調用該對象的executed()方法:

/** Used by {@code Call#execute} to signal it is in-flight. */
  synchronized void executed(RealCall call) {
    runningSyncCalls.add(call);
  }
複製代碼

  該方法中,runningSyncCalls是一個存放同步請求的隊列,這裏僅僅只是將RealCall加入到同步請求的隊列中,Dispatcher對象中相關的隊列有:

/** Ready async calls in the order they'll be run. */ private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>(); /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

  /** Running synchronous calls. Includes canceled calls that haven't finished yet. */ private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>(); 複製代碼
  • readyAsyncCalls 是異步請求的就緒隊列
  • runningAsyncCalls 是異步請求的執行隊列
  • runningSyncCalls 是同步請求的執行隊列

  調用完Dispatcher的executed()方法後,緊接着調用getResponseWithInterceptorChain()方法獲取Response對象,這個實際上是一個攔截器鏈的方法,該方法內部會依次調用攔截器來進行相應的操做。

最後看一下finally中:

finally {
      client.dispatcher().finished(this);
}
複製代碼

  經過調用Dispatcher的finished()方法,傳入當前的RealCall對象,查看該方法的代碼能夠發現:

/** Used by {@code Call#execute} to signal completion. */
  void finished(RealCall call) {
    finished(runningSyncCalls, call, false);
  }
複製代碼
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    int runningCallsCount;
    Runnable idleCallback;
    synchronized (this) {
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      if (promoteCalls) promoteCalls();
      runningCallsCount = runningCallsCount();
      idleCallback = this.idleCallback;
    }

    if (runningCallsCount == 0 && idleCallback != null) {
      idleCallback.run();
    }
  }
複製代碼

  該方法繼續調用了其餘一個同名的的方法,將正在執行的同步請求隊列傳了進來,在同步代碼塊中,移除掉同步請求隊列中的call對象,並進行了判斷,若是移除出錯,則會拋出異常。接着判斷promoteCalls,因爲這裏傳入的promoteCalls爲false,因此不會走promoteCalls()方法。

  接着,對runningCallsCount從新賦值,runningCallsCount用於記錄當前正在執行的請求數,查看該方法的代碼:

public synchronized int runningCallsCount() {
    return runningAsyncCalls.size() + runningSyncCalls.size();
}
複製代碼

該方法很簡單,即返回正在執行的異步請求數和正在執行的同步請求數的總和。

if (runningCallsCount == 0 && idleCallback != null) {
      idleCallback.run();
}
複製代碼

  最後經過判斷當前正在執行的請求數,若是當前沒有正在執行的請求數而且有設置閒置時的回調,則會回調其run()方法。

總結

  到此,同步請求的執行流程就已經分析完了,由上述的分析能夠知道,在同步請求中,Dispatcher分發器作的工做很是簡單,就兩個操做,保存同步請求和移除同步請求

OkHttpClient異步請求的執行流程和源碼分析

異步請求示例

OkHttpClient okHttpClient = new OkHttpClient.Builder().readTimeout(5, TimeUnit.SECONDS).build();
        Request request = new Request.Builder().url("https://www.baidu.com").get().build();
        Call call = okHttpClient.newCall(request);
        call.enqueue(new Callback() {
            @Override
            public void onFailure(Call call, IOException e) {
                
            }

            @Override
            public void onResponse(Call call, Response response) throws IOException {

            }
        });
複製代碼

異步請求的步驟

  1. 建立OkHttpClient對象和Request對象,均是採用Builder模式建立,構建者(Builder)設計模式(又叫生成器設計模式)

  2. 將Request封裝成Call對象

  3. 調用Call的enqueue()方法進行異步請求

同步和異步的區別

  1. 發起請求的方法調用

  2. 阻塞線程與否

源碼分析

  異步請求的前兩步,和同步請求的一致,都是一些準備工做,並無發起請求,這裏再也不重複說明,最主要的是第三步,調用Call對象的enqueue()方法,具體的實現仍是在RealCall類中,查看該方法代碼:

@Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    eventListener.callStart(this);
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }
複製代碼

  前面的操做和同步請求的execute()方法類似,主要是 client.dispatcher().enqueue(new AsyncCall(responseCallback)) 這行代碼,調用Dispatcher的enqueue()方法,將Callback回調封裝成AsyncCall對象做爲參數傳入,經過查看代碼,瞭解到AsyncCall對象繼承自NamedRunnable對象,而NamedRunnable對象實現了Runnable接口,接着繼續查看Dispatcher的enqueue()方法源碼:

synchronized void enqueue(AsyncCall call) {
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else {
      readyAsyncCalls.add(call);
    }
  }
複製代碼

  該方法前加了synchronized修飾符,是一個同步方法,根據判斷當前執行的異步請求數是否小於maxRequests(最大請求數,默認爲64) 且當前執行的異步請求隊列中相同主機的請求數小於maxRequestsPerHost(每一個主機最大請求數,默認爲5) 來進行處理,若是兩者都小於設置的值,則將該請求添加到runningAsyncCalls(異步請求執行隊列)中,不然則添加到readyAsyncCalls(異步請求準備隊列)中。

runningCallsForHost()方法的代碼:

/** Returns the number of running calls that share a host with {@code call}. */
  private int runningCallsForHost(AsyncCall call) {
    int result = 0;
    for (AsyncCall c : runningAsyncCalls) {
      if (c.get().forWebSocket) continue;
      if (c.host().equals(call.host())) result++;
    }
    return result;
  }
複製代碼

  經過註釋能夠知道,該方法返回同一個主機的請求數目,經過遍歷執行中的異步請求隊列,和傳入的AsyncCall對象的主機對比,若是相同則記錄數遞增,以此得到和傳入AsyncCall對象相同主機的請求數。

enqueue()方法中,主要的代碼:

executorService().execute(call);
複製代碼

這裏是進行異步請求操做的代碼,先看下executorService()方法的代碼:

public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
}
複製代碼

  該方法也是一個同步方法,主要用於返回 ExecutorService 對象,在這裏僅一次建立了線程池對象 ThreadPoolExecutor,第二個參數傳入了Integer的最大值,即線程池所能容納的最大線程數爲Integer.MAX_VALUE,雖然這裏設置了很大的值,可是實際狀況下並不是會達到最大值,由於上面enqueue()方法中有作了判斷,主要的仍是maxRequests這個值決定異步請求線程池的最大數量。

  executorService()方法返回了線程池對象,接着調用它的execute()方法,傳入實現Runnable接口的AsyncCall對象,上面說起到AsyncCall繼承NamedRunnable,而NamedRunnable對象實現了Runnable接口,因此咱們想知道該線程池執行這個任務作了什麼,就得看下NamedRunnable對象的 run() 方法:

@Override public final void run() {
    String oldName = Thread.currentThread().getName();
    Thread.currentThread().setName(name);
    try {
      execute();
    } finally {
      Thread.currentThread().setName(oldName);
    }
}

複製代碼

該方法中,真正的處理邏輯是在execute()方法中:

protected abstract void execute();
複製代碼

而execute()方法是一個抽象方法,因此要回到繼承NamedRunnable對象的AsyncCall類中:

@Override protected void execute() {
      boolean signalledCallback = false;
      try {
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          eventListener.callFailed(RealCall.this, e);
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        client.dispatcher().finished(this);
      }
}
複製代碼

  這裏纔是真正進行異步請求操做的邏輯,一樣也是經過getResponseWithInterceptorChain()方法獲得Response對象,關於getResponseWithInterceptorChain()方法的分析在下面的文章裏將會介紹,接着經過判斷retryAndFollowUpInterceptor是否取消回調CallBack接口的onFailure()或onResponse()方法,最後finally中,和同步請求的處理同樣,調用了Dispatcher對象的finished()方法:

/** Used by {@code AsyncCall#run} to signal completion. */
  void finished(AsyncCall call) {
    finished(runningAsyncCalls, call, true);
  }
複製代碼

也是調用了帶三個參數的finished()方法,傳入了runningAsyncCalls,call,第三個參數傳入了true。

private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    int runningCallsCount;
    Runnable idleCallback;
    synchronized (this) {
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      if (promoteCalls) promoteCalls();
      runningCallsCount = runningCallsCount();
      idleCallback = this.idleCallback;
    }

    if (runningCallsCount == 0 && idleCallback != null) {
      idleCallback.run();
    }
}
複製代碼

這裏的處理和同步請求結束後的處理多了一個promoteCalls()方法的調用,由於這裏promoteCalls傳入了true,因此會走promoteCalls()方法:

private void promoteCalls() {
    if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
    if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.

    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
      AsyncCall call = i.next();

      if (runningCallsForHost(call) < maxRequestsPerHost) {
        i.remove();
        runningAsyncCalls.add(call);
        executorService().execute(call);
      }

      if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
    }
  }
複製代碼

  看完這個方法,會有一種恍然大悟的感受,由於上面調用enqueue()方法的時候,會根據狀況將請求添加到runningAsyncCalls(異步請求執行隊列)或readyAsyncCalls(異步請求準備隊列)中,而readyAsyncCalls隊列中的請求何時執行呢,相信在看enqueue()方法的時候會有這個疑問,看了promoteCalls()後疑問將會被解答,爲了方便閱讀再次貼上enqueue()方法:

synchronized void enqueue(AsyncCall call) {
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else {
      readyAsyncCalls.add(call);
    }
  }
複製代碼

  promoteCalls()方法中,首先作了一些判斷,當runningAsyncCalls(異步請求執行隊列)已經達到設置的最大的請求數或當前readyAsyncCalls(異步請求準備隊列)中沒有請求的時候,則直接返回不作處理,若是知足條件,則會遍歷readyAsyncCalls隊列,將該請求添加到runningAsyncCalls隊列中,並調用 executorService().execute(call) 對該請求進行處理。

總結

  若是異步請求數超過最大請求數或同個主機最大請求數超過設置的值的時候,該請求就會添加到readyAsyncCalls(異步請求準備隊列)中,當執行完runningAsyncCalls(異步請求執行隊列)的請求後,將會調用Dispatcher的finished()三個參數的方法,第三個參數傳入true,會調用promoteCalls()方法,遍歷準備隊列readyAsyncCalls,將該隊列的中的請求添加到執行隊列runningAsyncCalls中,調用 executorService().execute(call)進行處理。

Dispatcher的做用

維護請求的狀態,並維護一個線程池,用於執行請求。

異步請求爲何須要兩個隊列

異步請求的設計能夠將其理解成生產者消費者模式,其中各個角色分別爲:

  • Dispatcher 生產者
  • ExecutorService 消費者池
  • Deque readyAsyncCalls 緩存
  • Deque runningAsyncCalls 正在運行的任務

當同步和異步請求結束後,會調用dispatcher的finished方法,將當前的請求從隊列中移除。

下一篇文章中,將爲你們講解一下OkHttp的攔截器鏈,感興趣的朋友能夠繼續閱讀:

OkHttpClient源碼分析(二) —— RetryAndFollowUpInterceptor和BridgeInterceptor

相關文章
相關標籤/搜索