OkHttp之因此可以高效處理任務的一個很重要緣由在於其內部維護了三個任務隊列(readyAsyncCalls、runningAsyncCalls、runningSyncCalls)和一個線程池(ThreadPoolExecutor)。這四個東西由內部的任務分發器dispathcer來進行調控處理,從而達到高效處理多任務的效果。java
線程池的做用不言而喻,他的主要做用在於能夠避免咱們在使用線程進行耗時任務的時候每次都開啓線程,用完以後又銷燬線程所帶來的效率與性能問題。他能夠對線程進行屢次的操做並複用空閒線程,從而達到不須要每次都開啓以及銷燬線程的目的。關於線程的知識,若是有不瞭解的能夠去參考我寫的這篇文章 Java中的線程詳解,裏面對線程池的各類類型還有內部操做有詳盡的介紹。緩存
okHttp中的任務隊列由兩部分組成:併發
public final class Dispatcher {
private int maxRequests = 64;
private int maxRequestsPerHost = 5;
private Runnable idleCallback;
/** Executes calls. Created lazily. */
private ExecutorService executorService;
/** Ready async calls in the order they'll be run. */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
public Dispatcher(ExecutorService executorService) {
this.executorService = executorService;
}
public Dispatcher() {
}
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
...
}
複製代碼
能夠看到,dispatcher裏面實例化了三個任務隊列readyAsyncCalls、runningAsyncCalls與runningSyncCalls還有一個線程池ThreadPoolExecutor。異步
readyAsyncCalls:等待執行異步任務隊列。當有任務須要dispatcher將其添加進入線程池時,會先判斷線程池是否還有能夠執行的線程,若是發現沒有執行的線程,此時先將任務放入到這個任務隊列中等待,等到線程池有空閒線程能夠執行任務的時候再從這個任務隊列中取出任務交給線程池去處理。async
runningAsyncCalls:運行中異步任務隊列。存儲dispatcher將任務交給線程池去處理的任務。ide
runningSyncCalls:運行中的同步隊列。同步隊列和異步隊列不一樣,他是一個串行的,而不是並行的,因此這個表明在同步操做的狀況下運行的隊列。函數
咱們看到在executeService()方法中建立了一個線程池ThreadPoolExecutor,裏面第一個參數核心線程數設置爲了0,表明在空閒一段時間後線程將會被所有銷燬。高併發
能夠看出,在Okhttp中,構建了一個閥值爲[0, Integer.MAX_VALUE]的線程池,它不保留任何最小線程數,隨時建立更多的線程數,當線程空閒時只能活60秒,它使用了一個不存儲元素的阻塞工做隊列,一個叫作"OkHttp Dispatcher"的線程工廠。性能
也就是說,在實際運行中,當收到10個併發請求時,線程池會建立十個線程,當工做完成後,線程池會在60s後相繼關閉全部線程。學習
dispatcher分發器相似於Ngnix中的反向代理,經過Dispatcher將任務分發到合適的空閒線程,實現非阻塞,高可用,高併發鏈接。
@Override public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
try {
client.dispatcher().executed(this);
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} finally {
client.dispatcher().finished(this);
}
}
複製代碼
能夠看到同步請求總共作了四件事
判斷任務是否正在執行executed,若是正在執行則拋出異常。這表明同一個任務一次只能被執行一次,而並不能被執行屢次。
將任務交給任務調用器,dispatcher調用executed去執行這個任務。
經過getResponseWithInterceptorChain()鏈調用攔截器,以後將任務執行的結果返回Response。
4.以後在任務執行完成調用dispatcher將其finish掉。
至此一個同步請求任務就算完成了。這裏關於getResponseWithInterceptorChain()中執行的一些攔截器的操做,之後我會專門寫一篇文章來說解OkHttp的攔截器的原理。
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
//添加正在運行的請求
runningAsyncCalls.add(call);
//線程池執行請求
executorService().execute(call);
} else {
//添加到緩存隊列排隊等待
readyAsyncCalls.add(call);
}
}
複製代碼
在異步操做中,會先去判斷runningAsyncCalls隊列中的任務數量是否會大於最大請求數量(maxRequest),這個的最大請求數量爲64,而後在判斷是否runningCallsForHost是否小於maxRequestsPerHost(單一host請求)。若是兩個當中有一個不知足,則表明線程池中可執行的線程數不夠,不能將任務添加到線程中去執行。此時則將任務直接添加到緩存隊列排隊等待(readyAsyncCalls),等到有可執行的線程的時候再將任務添加到正在運行的隊列中,再調用線程池去執行call任務。
接下來看看execute裏面的源碼
@Override protected void execute() {
boolean signalledCallback = false;
try {
//執行耗時IO任務
Response response = getResponseWithInterceptorChain(forWebSocket);
if (canceled) {
signalledCallback = true;
//回調,注意這裏回調是在線程池中,而不是想固然的主線程回調
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
//回調,同上
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
logger.log(Level.INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this, e);
}
} finally {
//最關鍵的代碼
client.dispatcher().finished(this);
}
}
複製代碼
能夠看到裏面有調用了攔截器鏈getResponseWithInterceptorChain(),並將任務的結果又一次返回Response。裏面會根據任務是否被Cancled而去回調不一樣的方法。被Canceled就去調用onFailure(0方法,在裏面處理失敗的邏輯,成功就去調用成功的方法Response(),並將返回值交給他去處理。最後不管成功仍是失敗都會去調用dispatcher的finish方法來結束掉這個任務。
咱們在來深刻看下finish的方法裏面作了哪些操做:
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
複製代碼
接下來看看promoteCalls:
private void promoteCalls() {
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
複製代碼
promoteCalls的邏輯也很簡單:掃描待執行任務隊列,將任務放入正在執行任務隊列,並執行該任務。
以上就是整個任務隊列的實現細節,總結起來有如下幾個特色:
有興趣能夠關注個人小專欄,學習更多知識:小專欄