關於做者html
郭孝星,程序員,吉他手,主要從事Android平臺基礎架構方面的工做,歡迎交流技術方面的問題,能夠去個人Github提issue或者發郵件至guoxiaoxingse@163.com與我交流。java
文章目錄android
本篇文章主要用來討論Java中多線程併發原理與實踐經驗,並非一篇使用例子教程,這方面內容能夠參考網上其餘文章。git
線程是比進程更加輕量級的調度單位,線程的引入能夠把進程的資源分配和執行調度分開,各個線程既能夠共享進程資源,又能夠獨立調度。程序員
一般你們都會這麼去解釋進程與線程的區別,在文章01Android進程框架:進程的啓動建立、啓動與調度流程中 咱們剖析了進程的本質,咱們這裏再簡單回憶一下。github
關於進程本質的描述:數據庫
咱們知道,代碼是靜態的,有代碼和資源組成的系統要想運行起來就須要一種動態的存在,進程就是程序的動態執行過程。何爲進程? 進程就是處理執行狀態的代碼以及相關資源的集合,包括代碼段、文件、信號、CPU狀態、內存地址空間等。數組
進程使用task_struct結構體來描述,以下所示:緩存
咱們接着來看看Java線程的建立序列圖,以下所示:安全
能夠看到,最終調用pthread庫的pthread_create()方法建立了新的線程,該線程也使用task_struct結構體來描述,可是它沒有本身獨立的地址空間,而是與其所在的進程共享地址空間和資源。
因此你能夠發現,對於虛擬機而言,除了是否具備獨立的地址空間外,進程與線程並無本質上的區別。
咱們接着來看看線程是如何調度的。
線程狀態流程圖圖
NEW、WAITING、TIMED_WAITING都比較好理解,咱們重點說一說RUNNABLE運行態和BLOCKED阻塞態。
線程進入RUNNABLE運行態通常分爲五種狀況:
線程進入BLOCKED阻塞態通常也分爲五種狀況:
咱們再來看看和線程狀態相關的一些方法。
線程安全,一般所說的線程安全指的是相對的線程安全,它指的是對這個對象單獨的操做是線程安全的,咱們在調用的時候無需作額外的保障措施。
什麼叫相對安全?🤔
🤞舉個栗子
咱們知道Java裏的Vector是個線程安全的類,在多線程環境下對其插入、刪除和讀取都是安全的,但這僅限於每次只有一個線程對其操做,若是多個線程同時操做 Vector,那它就再也不是線程安全的了。
final Vector<String> vector = new Vector<>();
while (true) {
for (int i = 0; i < 10; i++) {
vector.add("項:" + i);
}
Thread removeThread = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < vector.size(); i++) {
vector.remove(i);
}
}
});
Thread printThread = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < vector.size(); i++) {
Log.d(TAG, vector.get(i));
}
}
});
removeThread.start();
printThread.start();
if (Thread.activeCount() >= 20) {
return;
}
}
複製代碼
可是程序卻crash了
正確的作法應該是vector對象加上同步鎖,以下:
final Vector<String> vector = new Vector<>();
while (true) {
for (int i = 0; i < 10; i++) {
vector.add("項:" + i);
}
Thread removeThread = new Thread(new Runnable() {
@Override
public void run() {
synchronized (vector){
for (int i = 0; i < vector.size(); i++) {
vector.remove(i);
}
}
}
});
Thread printThread = new Thread(new Runnable() {
@Override
public void run() {
synchronized (vector){
for (int i = 0; i < vector.size(); i++) {
Log.d(TAG, vector.get(i));
}
}
}
});
removeThread.start();
printThread.start();
if (Thread.activeCount() >= 20) {
return;
}
}
複製代碼
volatile也是互斥同步的一種實現,不過它很是的輕量級。
volatile有兩條關鍵的語義:
要理解volatile關鍵字,咱們得先從Java的線程模型開始提及。如圖所示:
Java內存模型規定了全部字段(這些字段包括實例字段、靜態字段等,不包括局部變量、方法參數等,由於這些是線程私有的,並不存在競爭)都存在主內存中,每一個線程會 有本身的工做內存,工做內存裏保存了線程所使用到的變量在主內存裏的副本拷貝,線程對變量的操做只能在工做內存裏進行,而不能直接讀寫主內存,固然不一樣內存之間也 沒法直接訪問對方的工做內存,也就是說主內存時線程傳值的媒介。
咱們來理解第一句話:
保證被volatile修飾的變量對全部線程都是可見的
如何保證可見性?🤔
被volatile修飾的變量在工做內存修改後會被強制寫回主內存,其餘線程在使用時也會強制從主內存刷新,這樣就保證了一致性。
關於「保證被volatile修飾的變量對全部線程都是可見的」,有種常見的錯誤理解:
錯誤理解:因爲volatile修飾的變量在各個線程裏都是一致的,因此基於volatile變量的運算在多線程併發的狀況下是安全的。
這句話的前半部分是對的,後半部分卻錯了,所以它忘記考慮變量的操做是否具備原子性這一問題。
:point_up:舉個栗子
private volatile int start = 0;
private void volatileKeyword() {
Runnable runnable = new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
start++;
}
}
};
for (int i = 0; i < 10; i++) {
Thread thread = new Thread(runnable);
thread.start();
}
Log.d(TAG, "start = " + start);
}
複製代碼
這段代碼啓動了10個線程,每次10次自增,按道理最終結果應該是100,可是結果並不是如此。
爲何會這樣?:thinking:
仔細看一下start++,它其實並不是一個原子操做,簡單來看,它有兩步:
因此volatile只能保證可見性,在不符合如下場景下咱們依然須要經過加鎖來保證原子性:
比方說咱們會在線程里加個boolean變量,來判斷線程是否中止,這種狀況就很是適合使用volatile。
咱們再來理解第二句話。
什麼是指令重排序?🤔
指令重排序是值指令亂序執行,即在條件容許的狀況下,直接運行當前有能力當即執行的後續指令,避開爲獲取下一條指令所需數據而形成的等待,經過亂序執行的技術,提供執行效率。
指令重排序繪製被volatile修飾的變量的賦值操做前,添加一個內存屏障,指令重排序時不能把後面的指令重排序的內存屏障以前的位置。
關於指令重排序不是本篇文章重點討論的內容,更多細節能夠參考指令重排序。
synchronized是互斥同步的一種實現。
synchronized:當某個線程訪問被synchronized標記的方法或代碼塊時,這個線程便得到了該對象的鎖,其餘線程暫時沒法訪問這個方法,只有等待這個方法執行完畢或者代碼塊執行完畢,這個 線程纔會釋放該對象的鎖,其餘線程才能執行這個方法或代碼塊。
前面咱們已經說了volatile關鍵字,這裏咱們舉個例子來綜合分析volatile與synchronized關鍵字的使用。
:point_up:舉個栗子
public class Singleton {
//volatile保證了:1 instance在多線程併發的可見性 2 禁止instance在操做是的指令重排序
private volatile static Singleton instance;
public static Singleton getInstance() {
//第一次判空,保證沒必要要的同步
if (instance == null) {
//synchronized對Singleton加全局所,保證每次只要一個線程建立實例
synchronized (Singleton.class) {
//第二次判空時爲了在null的狀況下建立實例
if (instance == null) {
instance = new Singleton();
}
}
}
return instance;
}
}
複製代碼
這是一個經典的DSL單例。
它的字節碼以下:
能夠看到被synchronized同步的代碼塊,會在先後分別加上monitorenter和monitorexit,這兩個字節碼都須要指定加鎖和解鎖的對象。
關於加鎖和解鎖的對象:
synchronized(this)添加的是對象鎖,synchronized(ClassName.class)添加的是類鎖,它們的區別以下:
對象鎖:Java的全部對象都含有1個互斥鎖,這個鎖由JVM自動獲取和釋放。線程進入synchronized方法的時候獲取該對象的鎖,固然若是已經有線程獲取了這個對象的鎖,那麼當前線 程會等待;synchronized方法正常返回或者拋異常而終止,JVM會自動釋放對象鎖。這裏也體現了用synchronized來加鎖的好處,方法拋異常的時候,鎖仍然能夠由JVM來自動釋放。
類鎖:對象鎖是用來控制實例方法之間的同步,類鎖是用來控制靜態方法(或靜態變量互斥體)之間的同步。其實類鎖只是一個概念上的東西,並非真實存在的,它只是用來幫助咱們理 解鎖定實例方法和靜態方法的區別的。咱們都知道,java類可能會有不少個對象,可是隻有1個Class對象,也就是說類的不一樣實例之間共享該類的Class對象。Class對象其實也僅僅是1個 java對象,只不過有點特殊而已。因爲每一個java對象都有1個互斥鎖,而類的靜態方法是須要Class對象。因此所謂的類鎖,不過是Class對象的鎖而已。獲取類的Class對象有好幾種,最簡 單的就是MyClass.class的方式。 類鎖和對象鎖不是同一個東西,一個是類的Class對象的鎖,一個是類的實例的鎖。也就是說:一個線程訪問靜態synchronized的時候,容許另外一個線程訪 問對象的實例synchronized方法。反過來也是成立的,由於他們須要的鎖是不一樣的。
關不一樣步鎖還有ReentrantLock,eentrantLockR相對於synchronized具備等待可中斷、公平鎖等更多功能,這裏限於篇幅,再也不展開。
咱們知道線程的建立、切換與銷燬都會花費比較大代價,因此很天然的咱們使用線程池來複用和管理線程。Java裏的線程池咱們一般經過ThreadPoolExecutor來實現。 接下來咱們就來分析ThreadPoolExecutor的相關原理,以及ThreadPoolExecutor在Android上的應用AsyncTask。
線程池有五種運行狀態,以下所示:
線程池狀態圖
另外,ThreadPoolExecutor是用一個AtomicInteger來記錄線程池狀態和線程池裏的線程數量的,以下所示:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;// 111
private static final int SHUTDOWN = 0 << COUNT_BITS;// 000
private static final int STOP = 1 << COUNT_BITS;// 001
private static final int TIDYING = 2 << COUNT_BITS;// 010
private static final int TERMINATED = 3 << COUNT_BITS;// 110
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }//線程池狀態
private static int workerCountOf(int c) { return c & CAPACITY; }//線程池當前線程數
private static int ctlOf(int rs, int wc) { return rs | wc; }
複製代碼
在正式介紹線程池調度原理以前,咱們先來回憶一下Java實現任務的兩個接口:
另外,還有個Future接口,它能夠對Runnable、Callable執行的任務進行判斷任務是否完成,中斷任務以及獲取任務結果的操做。咱們一般會使用它的實現類FutureTask,FutureTask是一個Future、Runnable 以及Callable的包裝類。利用它能夠很方便的完成Future接口定義的操做。FutureTask內部的線程阻塞是基於LockSupport來實現的。
咱們接下來看看線程池是和執行任務的。
ThreadPoolExecutor調度流程圖
execute(Runnable command)
public class ThreadPoolExecutor extends AbstractExecutorService {
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//1. 若線程池狀態是RUNNING,線程池大小小於配置的核心線程數,則能夠在線程池中建立新線程執行新任務。
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//2. 若線程池狀態是RUNNING,線程池大小大於配置的核心線程數,則嘗試將任務插入阻塞隊列進行等待
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//若插入成功,則將次檢查線程池的狀態是否爲RUNNING,若是不是則移除當前任務並進入拒絕策略。
if (! isRunning(recheck) && remove(command))
reject(command);
//若是線程池中的線程數爲0,即線程池中的線程都執行完畢處於SHUTDOWN狀態,此時添加了一個null任務
//(由於SHUTDOWN狀態再也不接受新任務)
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//3. 若沒法插入阻塞隊列,則嘗試建立新線程,建立失敗則進入拒絕策略。
else if (!addWorker(command, false))
reject(command);
}
}
複製代碼
這個其實很好理解,打個比方。咱們公司的一個小組來完成任務,
addWorker(Runnable firstTask, boolean core)
addWorker(Runnable firstTask, boolean core) 表示添加個Worker,Worker實現了Runnable接口,是對Thread的封裝,該方法添加完Worker後,則調用runWorker()來啓動線程。
public class ThreadPoolExecutor extends AbstractExecutorService {
private boolean addWorker(Runnable firstTask, boolean core) {
//重試標籤
retry:
for (;;) {
int c = ctl.get();
//獲取當前線程池狀態
int rs = runStateOf(c);
//如下狀況表示再也不接受新任務:1 線程池沒有處於RUNNING狀態 2 要執行的任務爲空 3 阻塞隊列已滿
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
//獲取線程池當前的線程數
int wc = workerCountOf(c);
//若是超出容量,則再也不接受新任務,core表示是否使用corePoolSize做爲比較標準
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//增長線程數
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
//若是線程池狀態發生變化,從新開始循環
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//線程數增長成功,開始添加新線程,Worker是Thread的封裝類
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
//加鎖
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//將新啓動的線程添加到線程池中
workers.add(w);
//更新線程池中線程的數量,注意這個數量不能超過largestPoolSize
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//調用runWorker()方法,開始執行線程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
}
複製代碼
runWorker(Worker w)
runWorker()方法是整個阻塞隊列的核心循環,在這個循環中,線程池會不斷的從阻塞隊列workerQueue中取出的新的task並執行。
public class ThreadPoolExecutor extends AbstractExecutorService {
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//從阻塞隊列中不斷取出任務,若是取出的任務爲空,則循環終止
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//該方法爲空,能夠從新次方法,在任務執行開始前作一些處理
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//該方法爲空,能夠從新次方法,在任務執行結束後作一些處理
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
//從阻塞隊列workerQueue中取出Task
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
//循環
for (;;) {
int c = ctl.get();
//獲取線程池狀態
int rs = runStateOf(c);
//如下狀況中止循環:1 線程池狀態不是RUNNING(>= SHUTDOWN)2 線程池狀態>= STOP 或者阻塞隊列爲空
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
//遞減workCount
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 判斷線程的IDLE超時機制是否生效,有兩種狀況:1 allowCoreThreadTimeOut = true,這是能夠手動
//設置的 2 當前線程數大於核心線程數
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//根據timed來決定是以poll超時等待的方式仍是以take()阻塞等待的方式從阻塞隊列中獲取任務
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
}
複製代碼
因此你能夠理解了,runWorker()方法是在新建立線程的run()方法裏的,而runWorker()又不斷的調用getTask()方法去獲取阻塞隊列裏的任務,這樣就實現了線程的複用。
咱們先來看看ThreadPoolExecutor的構造方法:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) 複製代碼
那麼這些參數咱們應該怎麼配置呢?要合理配置線程池就須要先了解咱們的任務特性,通常說來:
咱們根據這些屬性來一一分析這些參數的配置。
首先就是核心線程數corePoolSize與最大線程數maximumPoolSize。這個的配置咱們一般要考慮CPU同時執行線程的閾值。一旦超過這個閾值,CPU就須要花費不少 時間來完成線程的切換與調度,這樣會致使性能大幅下滑。
/** * CPU核心數,注意該方法並不可靠,它返回的有可能不是真實的CPU核心數,由於CPU在某些狀況下會對某些核 * 心進行睡眠處理,這種狀況返回的知識已激活的CPU核心數。 */
private static final int NUMBER_OF_CPU = Runtime.getRuntime().availableProcessors();
/** * 核心線程數 */
private static final int corePoolSize = Math.max(2, Math.min(NUMBER_OF_CPU - 1, 4));
/** * 最大線程數 */
private static final int maximumPoolSize = NUMBER_OF_CPU * 2 + 1;
複製代碼
至於keepAliveTime,該參數描述了線程不活動時存活的時間,若是是CPU密集型任務,則將時間設置的小一些,若是是IO密集型或者數據庫鏈接任務,則將時間設置的長一些。
咱們再來看看BlockingQueue參數的配置。BlockingQueue用來描述阻塞隊列。它的方法以四種形式存在,以此來知足不一樣需求。
拋出異常 | 特殊值 | 阻塞 | 超時 |
---|---|---|---|
add(e) | offer(e) | put(e) | offer(e, time, unit) |
remove() | poll() | take() | poll(time, unit) |
element() | peek() | 不可用 | 不可用 |
它有如下特色:
它的實現類有:
工做竊取:例若有兩個隊列A、B,各自幹本身的活,可是A效率比較高,很快把本身的活幹完了,因而勤快的A就會去竊取B的任務來幹,這是A、B會訪問同一個隊列,爲了減小A、B的競爭,規定竊取者A 只從雙端隊列的尾部拿任務,被竊取者B只從雙端隊列的頭部拿任務。
咱們最後來看看RejectedExecutionHandler參數的配置。
RejectedExecutionHandler用來描述線程數大於或等於線程池最大線程數時的拒絕策略,它的實現類有:
另外,Executors提供了一系列工廠方法用來建立線程池。這些線程是適用於不一樣的場景。
ThreadPoolExecutor裏提供了一些空方法,咱們能夠經過繼承ThreadPoolExecutor,複寫這些方法來實現對線程池的監控。
public class ThreadPoolExecutor extends AbstractExecutorService {
protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
}
複製代碼
常見的監控指標有:
AsyncTask基於ThreadPoolExecutor實現,內部封裝了Thread+Handler,多用來執行耗時較短的任務。
一個簡單的AsyncTask例子
public class AsyncTaskDemo extends AsyncTask<String, Integer, String> {
/** * 在後臺任務開始執行以前調用,用於執行一些界面初始化操做,例如顯示一個對話框,UI線程。 */
@Override
protected void onPreExecute() {
super.onPreExecute();
}
/** * 執行後臺線程,執行完成能夠經過return語句返回,worker線程 * * @param strings params * @return result */
@Override
protected String doInBackground(String... strings) {
return null;
}
/** * 更新進度,UI線程 * * @param values progress */
@Override
protected void onProgressUpdate(Integer... values) {
super.onProgressUpdate(values);
}
/** * 後臺任務執行完成並經過return語句返回後會調用該方法,UI線程。 * * @param result result */
@Override
protected void onPostExecute(String result) {
super.onPostExecute(result);
}
/** * 後臺任務唄取消後回調 * * @param reason reason */
@Override
protected void onCancelled(String reason) {
super.onCancelled(reason);
}
/** * 後臺任務唄取消後回調 */
@Override
protected void onCancelled() {
super.onCancelled();
}
}
複製代碼
AsyncTask的使用很是的簡單,接下來咱們去分析AsyncTask的源碼實現。
AsyncTask流程圖
AsyncTask源碼的一開始就是個建立線程池的流程。
public abstract class AsyncTask<Params, Progress, Result> {
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
//核心線程數,最少2個,最多4個
private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
//線程不活動時的存活時間是30s
private static final int KEEP_ALIVE_SECONDS = 30;
//線程構建工廠,指定線程的名字
private static final ThreadFactory sThreadFactory = new ThreadFactory() {
private final AtomicInteger mCount = new AtomicInteger(1);
public Thread newThread(Runnable r) {
return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
}
};
//一個由鏈表結構組成的無界阻塞隊列
private static final BlockingQueue<Runnable> sPoolWorkQueue =
new LinkedBlockingQueue<Runnable>(128);
public static final Executor THREAD_POOL_EXECUTOR;
//構建線程池
static {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
sPoolWorkQueue, sThreadFactory);
threadPoolExecutor.allowCoreThreadTimeOut(true);
THREAD_POOL_EXECUTOR = threadPoolExecutor;
}
}
複製代碼
另外,咱們能夠經過AsyncTask.executeOnExecutor(Executor exec, Params... params) 來自定義線程池。
咱們再來看看構造方法。
public abstract class AsyncTask<Params, Progress, Result> {
//構造方法須要在UI線程裏調用
public AsyncTask() {
//建立一個Callable對象,WorkerRunnable實現了Callable接口
mWorker = new WorkerRunnable<Params, Result>() {
public Result call() throws Exception {
mTaskInvoked.set(true);
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
//noinspection unchecked
Result result = doInBackground(mParams);
Binder.flushPendingCommands();
return postResult(result);
}
};
//建立一個FutureTask對象,該對象用來接收mWorker的結果
mFuture = new FutureTask<Result>(mWorker) {
@Override
protected void done() {
try {
//將執行的結果經過發送給Handler處理,注意FutureTask的get()方法會阻塞直至結果返回
postResultIfNotInvoked(get());
} catch (InterruptedException e) {
android.util.Log.w(LOG_TAG, e);
} catch (ExecutionException e) {
throw new RuntimeException("An error occurred while executing doInBackground()",
e.getCause());
} catch (CancellationException e) {
postResultIfNotInvoked(null);
}
}
};
}
private void postResultIfNotInvoked(Result result) {
final boolean wasTaskInvoked = mTaskInvoked.get();
if (!wasTaskInvoked) {
postResult(result);
}
}
private Result postResult(Result result) {
@SuppressWarnings("unchecked")
Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
new AsyncTaskResult<Result>(this, result));
message.sendToTarget();
return result;
}
//內部的Handler
private static class InternalHandler extends Handler {
public InternalHandler() {
//UI線程的Looper
super(Looper.getMainLooper());
}
@SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})
@Override
public void handleMessage(Message msg) {
AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
switch (msg.what) {
//返回結果
case MESSAGE_POST_RESULT:
// There is only one result
result.mTask.finish(result.mData[0]);
break;
//返回進度
case MESSAGE_POST_PROGRESS:
result.mTask.onProgressUpdate(result.mData);
break;
}
}
}
}
複製代碼
能夠看到當咱們調用AsyncTask的構造方法時,就建立了一個FutureTask對象,它內部包裝了Callable對象(就是咱們要執行的任務),並在FutureTask對象的done()方法裏 將結果發送給Handler。
接着看看執行方法execute()。
public abstract class AsyncTask<Params, Progress, Result> {
//須要在UI線程裏調用
@MainThread
public final AsyncTask<Params, Progress, Result> execute(Params... params) {
return executeOnExecutor(sDefaultExecutor, params);
}
@MainThread
public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec, Params... params) {
if (mStatus != Status.PENDING) {
switch (mStatus) {
case RUNNING:
throw new IllegalStateException("Cannot execute task:"
+ " the task is already running.");
case FINISHED:
throw new IllegalStateException("Cannot execute task:"
+ " the task has already been executed "
+ "(a task can be executed only once)");
}
}
mStatus = Status.RUNNING;
//任務執行前的處理,咱們能夠複寫次方法
onPreExecute();
mWorker.mParams = params;
//執行任務,exec爲sDefaultExecutor
exec.execute(mFuture);
return this;
}
}
複製代碼
接着看看這個sDefaultExecutor。
能夠看到sDefaultExecutor是個SerialExecutor對象,SerialExecutor實現了Executor接口。
public abstract class AsyncTask<Params, Progress, Result> {
public static final Executor SERIAL_EXECUTOR = new SerialExecutor();
private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;
private static class SerialExecutor implements Executor {
//任務隊列
final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
//當前執行的任務
Runnable mActive;
public synchronized void execute(final Runnable r) {
mTasks.offer(new Runnable() {
public void run() {
try {
r.run();
} finally {
scheduleNext();
}
}
});
if (mActive == null) {
//開始執行任務
scheduleNext();
}
}
protected synchronized void scheduleNext() {
//取出隊列頭的任務開始執行
if ((mActive = mTasks.poll()) != null) {
THREAD_POOL_EXECUTOR.execute(mActive);
}
}
}
}
複製代碼
因此咱們沒調用一次AsyncTask.execute()方法就將FutureTask對象添加到隊列尾部,而後會從隊列頭部取出任務放入線程池中執行,因此你能夠看着這是一個串行執行器。
在Okhttp的任務調度器Dispatcher裏有關於線程池的配置
public final class Dispatcher {
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
}
複製代碼
你能夠看到它的配置:
這實際上是Excutors.newCachedThreadPool()緩存池的實現。總結來講就是新任務過來進入SynchronousQueue,它是一個單工模式的隊列,只傳遞任務,不存儲任務,而後就建立 新線程執行任務,線程不活動的存活時間爲60s。
Okhttp請求流程圖
在發起網絡請求時,每一個請求執行完成後都會調用client.dispatcher().finished(this)。
final class RealCall implements Call {
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
AsyncCall(Callback responseCallback) {
super("OkHttp %s", redactedUrl());
this.responseCallback = responseCallback;
}
String host() {
return originalRequest.url().host();
}
Request request() {
return originalRequest;
}
RealCall get() {
return RealCall.this;
}
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this, e);
}
} finally {
//異步請求
client.dispatcher().finished(this);
}
}
}
}
複製代碼
咱們來看看client.dispatcher().finished(this)這個方法。
public final class Dispatcher {
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
//將已經結束的請求call移除正在運行的隊列calls
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
//異步請求promoteCalls爲true
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
private void promoteCalls() {
//當前異步請求數大於最大請求數,不繼續執行
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
//異步等待隊列爲空,不繼續執行
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
//遍歷異步等待隊列
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
//若是沒有超過相同host的最大請求數,則複用當前請求的線程
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
//運行隊列達到上限,也再也不執行
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
}
複製代碼
因此你能夠看到Okhttp不是用線程池來控制線程個數,線程池裏的線程執行的都是正在運行請請求,控制線程的是Dispatcher,Dispatcher.promoteCalls()方法經過 最大請求數maxRequests和相同host最大請求數maxRequestsPerHost來控制異步請求不超過兩個最大值,在值範圍內不斷的將等待隊列readyAsyncCalls中的請求添加 到運行隊列runningAsyncCalls中去。