在 OkHttp 知識梳理(1) - OkHttp 源碼解析之入門 中,介紹了OkHttp
的簡單使用及同步請求的實現流程,今天這篇文章,咱們來一塊兒學習一下異步請求的內部實現原理及線程調度。java
首先,讓咱們回顧一下異步請求的實現方式:面試
private void startAsyncRequest() {
//如下三步和同步請求的步驟相同。
OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder().url(URL).build();
Call call = client.newCall(request);
//區別在於拿到 RealCall 對象以後的處理方式。
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
}
@Override
public void onResponse(Call call, Response response) throws IOException {
String result = response.body().string();
//返回結果給主線程。
Message message = mMainHandler.obtainMessage(MSG_UPDATE_UI, result);
mMainHandler.sendMessage(message);
}
});
}
複製代碼
能夠看到,對於異步請求而言,前面三步和同步請求是相同的,區別在於發起請求時,同步請求使用的是call.execute()
,只有當整個請求完成時纔會從.execute()
函數返回。緩存
而對於異步請求來講,.enqueue(Callback)
方法只要調用完就當即返回了,當網絡請求返回以後會回調Callback
的onResponse/onFailure
方法,而且這兩個回調方法是在子線程執行的,這也是異步請求和同步請求之間最主要的差異。服務器
下面咱們就來分析一下異步請求的內部實現邏輯。網絡
對於前面三步的內部實現再也不重複說明了,你們能夠查看 OkHttp 知識梳理(1) - OkHttp 源碼解析之入門 中的分析。最終咱們會獲得一個RealCall
實例,它表明了一個執行的任務。接下來看enqueue
內部作了什麼。多線程
public void enqueue(Callback responseCallback) {
//首先判斷該對象是否曾經被執行過。
synchronized(this) {
if(this.executed) {
throw new IllegalStateException("Already Executed");
}
this.executed = true;
}
//捕獲堆棧信息。
this.captureCallStackTrace();
//通知監聽者請求開始了。
this.eventListener.callStart(this);
//調用調度器的 enqueue 方法。
this.client.dispatcher().enqueue(new RealCall.AsyncCall(responseCallback));
}
複製代碼
這裏,咱們又見到了熟悉的dispatcher()
類,它enqueue
的實現爲:異步
private int maxRequests = 64;
private int maxRequestsPerHost = 5;
//執行任務的線程池。
private ExecutorService executorService;
//等待被執行的異步請求任務隊列。
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque();
//正在被執行的異步請求任務隊列。
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque();
public synchronized ExecutorService executorService() {
if(this.executorService == null) {
this.executorService = new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue(), Util.threadFactory("OkHttp Dispatcher", false));
}
return this.executorService;
}
synchronized void enqueue(AsyncCall call) {
//若是當前正在請求的數量小於 64,而且對於同一 host 的請求小於 5,才發起請求。
if(this.runningAsyncCalls.size() < this.maxRequests && this.runningCallsForHost(call) < this.maxRequestsPerHost) {
//將該任務加入到正在請求的隊列當中。
this.runningAsyncCalls.add(call);
//經過線程池執行任務。
this.executorService().execute(call);
//不然加入到等待隊列當中。
} else {
this.readyAsyncCalls.add(call);
}
}
複製代碼
Dispatcher
的enqueue
首先會判斷:若是當前正在請求的數量小於64
,而且對於同一host
的請求小於5
,才發起請求。發起請求以前會將RealCall
加入到runningAsyncCalls
隊列當中,並經過ThreadPoolExecutor
來執行該請求,ide
ThreadPoolExecutor
是Java
提供的線程池,在 多線程知識梳理(6) - 線程池四部曲之 ThreadPoolExecutor 中咱們已經介紹過它,這裏根據它的參數配置能夠看出,它對應於CachedThreadPool
,該線程池的特色是 線程池大小無界,適用於執行不少的短時間異步任務的程序或者是負載較輕的服務器。 函數
SynchonousQueue
,它的 每一個插入操做都必須等待另外一個線程的移除操做,對於線程池而言,也就是說:在添加任務到等待隊列時,必需要有一個空閒線程正在嘗試從等待隊列獲取任務,纔有可能添加成功。60s
內都沒法獲取到新的任務將會被銷燬。線程池的execute
函數接收Runnable
的接口實現類做爲參數,在該任務被執行時將會調用它的run()
方法,這上面的AsyncCall
也是同樣的道理,它繼承了NamedRunnable
抽象類,而NamedRunnable
又實現了Runnable
接口,當NamedRunnable
的run()
方法被回調時,會調用AsyncCall
的execute()
方法。源碼分析
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
//responseCallback 就是調用 call.enqueue 方法時傳入的回調。
AsyncCall(Callback responseCallback) {
super("OkHttp %s", new Object[]{RealCall.this.redactedUrl()});
this.responseCallback = responseCallback;
}
//該函數是在子線程當中執行的。
protected void execute() {
boolean signalledCallback = false;
try {
//和同步請求的邏輯相同。
Response response = RealCall.this.getResponseWithInterceptorChain();
if(RealCall.this.retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
this.responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
this.responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException var6) {
if(signalledCallback) {
Platform.get().log(4, "Callback failure for " + RealCall.this.toLoggableString(), var6);
} else {
RealCall.this.eventListener.callFailed(RealCall.this, var6);
this.responseCallback.onFailure(RealCall.this, var6);
}
} finally {
//調用 Dispatcher 的 finished 方法
RealCall.this.client.dispatcher().finished(this);
}
}
}
public abstract class NamedRunnable implements Runnable {
protected final String name;
public NamedRunnable(String format, Object... args) {
this.name = Util.format(format, args);
}
public final void run() {
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(this.name);
try {
//調用子類的 execute() 方法。
this.execute();
} finally {
Thread.currentThread().setName(oldName);
}
}
protected abstract void execute();
}
複製代碼
execute()
方法最終是在 子線程當中執行的,這裏咱們看到了熟悉的一句話:
Response response = RealCall.this.getResponseWithInterceptorChain();
複製代碼
這裏面就是進行請求的核心邏輯,咱們在 OkHttp 知識梳理(1) - OkHttp 源碼解析之入門 中的3.4
節中已經介紹過了,這裏會經過一系列的攔截器進行處理,重試請求、緩存處理和網絡請求都是在裏面完成的,最終獲得返回的Response
,並根據狀況回調最開始傳入的Callback
的onResponse/onFailure
方法。
當回調完以後,最終會調用Dispatcher
的finished
方法:
void finished(AsyncCall call) {
//若是是異步請求,那麼最後一個參數爲 true。
this.finished(this.runningAsyncCalls, call, true);
}
void finished(RealCall call) {
//若是是同步請求,那麼最後一個參數爲 false。
this.finished(this.runningSyncCalls, call, false);
}
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized(this) {
//從當前正在執行的任務列表中將它移除。
if (!calls.remove(call)) {
throw new AssertionError("Call wasn't in-flight!");
}
//尋找等待隊列中符合條件的任務去執行。
if (promoteCalls) {
this.promoteCalls();
}
runningCallsCount = this.runningCallsCount();
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
private void promoteCalls() {
if (this.runningAsyncCalls.size() < this.maxRequests) {
if (!this.readyAsyncCalls.isEmpty()) {
Iterator i = this.readyAsyncCalls.iterator();
do {
if(!i.hasNext()) {
return;
}
AsyncCall call = (AsyncCall)i.next();
if (this.runningCallsForHost(call) < this.maxRequestsPerHost) {
i.remove();
//找到了等待隊列中符合執行的條件的任務,那麼就執行它。
this.runningAsyncCalls.add(call);
this.executorService().execute(call);
}
} while(this.runningAsyncCalls.size() < this.maxRequests);
}
}
}
複製代碼
這裏和同步請求相同,都會走到finished
方法當中,區別在於最後一次參數是true
,而同步請求是false
,也就是說會調用到promoteCalls
方法中,promoteCalls
的做用爲:在最開始時,若是不知足執行條件,那麼任務將會被加入到等待隊列readyAsyncCalls
中,那麼當一個任務執行完以後,就須要去等待隊列中尋找符合執行條件的任務,並將它加入到任務隊列中執行,以後的邏輯和前面的相同。
promoteCalls
函數除了在一個異步請求執行完畢後會調用,當咱們改變最大請求數量和對於同一個host
的最大請求數量時,也會觸發該查找過程。
//改變了最大請求數量。
public synchronized void setMaxRequests(int maxRequests) {
if(maxRequests < 1) {
throw new IllegalArgumentException("max < 1: " + maxRequests);
} else {
this.maxRequests = maxRequests;
this.promoteCalls();
}
}
//改變了同一個 Host 的最大請求數量。
public synchronized void setMaxRequestsPerHost(int maxRequestsPerHost) {
if(maxRequestsPerHost < 1) {
throw new IllegalArgumentException("max < 1: " + maxRequestsPerHost);
} else {
this.maxRequestsPerHost = maxRequestsPerHost;
this.promoteCalls();
}
}
複製代碼
以上就是對於異步請求方式的源碼分析,由此咱們能夠總結出OkHttp
對於異步請求的調度方式:
runningAsyncCalls
中存放的是正在執行任務的列表,readyAsyncCalls
中則是等待被執行的任務。readyAsyncCalls
查找下一個能夠被執行的任務。ThreadPoolExecutor
在子線程中來完成的,由它來負責正在執行任務的調度,內部的實現原理如 多線程知識梳理(6) - 線程池四部曲之 ThreadPoolExecutor 所分析。AsyncCall
的execute()
函數中,這裏會經過一系列的攔截器進行處理,重試請求、緩存處理和網絡請求都是在裏面完成的,最終獲得返回的Response
,並根據狀況回調最開始傳入的Callback
的onResponse/onFailure
方法。Callback
的onResponse/onFailed
是在子線程當中執行的,所以若是要在其中執行更新UI
的操做,那麼須要通知主線程來更新。