Android進程框架:線程與線程池

關於做者html

郭孝星,程序員,吉他手,主要從事Android平臺基礎架構方面的工做,歡迎交流技術方面的問題,能夠去個人Github提issue或者發郵件至guoxiaoxingse@163.com與我交流。java

文章目錄android

  • 一 線程原理
    • 1.1 線程建立
    • 1.2 線程調度
  • 二 線程同步
    • 2.1 volatile
    • 2.2 synchronized
  • 三 線程池
    • 3.1 線程池調度
    • 3.2 線程池配置
    • 3.1 線程池監控
  • 四 線程池應用
    • 4.1 AsyncTask
    • 4.2 Okhttp

本篇文章主要用來討論Java中多線程併發原理與實踐經驗,並非一篇使用例子教程,這方面內容能夠參考網上其餘文章。git

一 線程原理

1.1 線程建立

線程是比進程更加輕量級的調度單位,線程的引入能夠把進程的資源分配和執行調度分開,各個線程既能夠共享進程資源,又能夠獨立調度。程序員

一般你們都會這麼去解釋進程與線程的區別,在文章01Android進程框架:進程的啓動建立、啓動與調度流程中 咱們剖析了進程的本質,咱們這裏再簡單回憶一下。github

關於進程本質的描述:數據庫

咱們知道,代碼是靜態的,有代碼和資源組成的系統要想運行起來就須要一種動態的存在,進程就是程序的動態執行過程。何爲進程? 進程就是處理執行狀態的代碼以及相關資源的集合,包括代碼段、文件、信號、CPU狀態、內存地址空間等。數組

進程使用task_struct結構體來描述,以下所示:緩存

  • 代碼段:編譯後造成的一些指令
  • 數據段:程序運行時須要的數據
    • 只讀數據段:常量
    • 已初始化數據段:全局變量,靜態變量
    • 未初始化數據段(bss):未初始化的全局變量和靜態變量
  • 堆棧段:程序運行時動態分配的一些內存
  • PCB:進程信息,狀態標識等

咱們接着來看看Java線程的建立序列圖,以下所示:安全

能夠看到,最終調用pthread庫的pthread_create()方法建立了新的線程,該線程也使用task_struct結構體來描述,可是它沒有本身獨立的地址空間,而是與其所在的進程共享地址空間和資源。

因此你能夠發現,對於虛擬機而言,除了是否具備獨立的地址空間外,進程與線程並無本質上的區別。

咱們接着來看看線程是如何調度的。

1.2 線程調度

線程狀態流程圖圖

  • NEW:建立狀態,線程建立以後,可是還未啓動。
  • RUNNABLE:運行狀態,處於運行狀態的線程,但有可能處於等待狀態,例如等待CPU、IO等。
  • WAITING:等待狀態,通常是調用了wait()、join()、LockSupport.spark()等方法。
  • TIMED_WAITING:超時等待狀態,也就是帶時間的等待狀態。通常是調用了wait(time)、join(time)、LockSupport.sparkNanos()、LockSupport.sparkUnit()等方法。
  • BLOCKED:阻塞狀態,等待鎖的釋放,例如調用了synchronized增長了鎖。
  • TERMINATED:終止狀態,通常是線程完成任務後退出或者異常終止。

NEW、WAITING、TIMED_WAITING都比較好理解,咱們重點說一說RUNNABLE運行態和BLOCKED阻塞態。

線程進入RUNNABLE運行態通常分爲五種狀況:

  • 線程調用sleep(time)後查出了休眠時間
  • 線程調用的阻塞IO已經返回,阻塞方法執行完畢
  • 線程成功的獲取了資源鎖
  • 線程正在等待某個通知,成功的得到了其餘線程發出的通知
  • 線程處於掛起狀態,而後調用了resume()恢復方法,解除了掛起。

線程進入BLOCKED阻塞態通常也分爲五種狀況:

  • 線程調用sleep()方法主動放棄佔有的資源
  • 線程調用了阻塞式IO的方法,在該方法返回前,該線程被阻塞。
  • 線程視圖得到一個資源鎖,可是該資源鎖正被其餘線程鎖持有。
  • 線程正在等待某個通知
  • 線程調度器調用suspend()方法將該線程掛起

咱們再來看看和線程狀態相關的一些方法。

  • sleep()方法讓當前正在執行的線程在指定時間內暫停執行,正在執行的線程能夠經過Thread.currentThread()方法獲取。
  • yield()方法放棄線程持有的CPU資源,將其讓給其餘任務去佔用CPU執行時間。但放棄的時間不肯定,有可能剛剛放棄,立刻又得到CPU時間片。
  • wait()方法是當前執行代碼的線程進行等待,將當前線程放入預執行隊列,並在wait()所在的代碼處中止執行,知道接到通知或者被中斷爲止。該方法可使得調用該方法的線程釋放共享資源的鎖, 而後從運行狀態退出,進入等待隊列,直到再次被喚醒。該方法只能在同步代碼塊裏調用,不然會拋出IllegalMonitorStateException異常。
  • wait(long millis)方法等待某一段時間內是否有線程對鎖進行喚醒,若是超過了這個時間則自動喚醒。
  • notify()方法用來通知那些可能等待該對象的對象鎖的其餘線程,該方法能夠隨機喚醒等待隊列中等同一共享資源的一個線程,並使該線程退出等待隊列,進入可運行狀態。
  • notifyAll()方法能夠是全部正在等待隊列中等待同一共享資源的所有線程從等待狀態退出,進入可運行狀態,通常會是優先級高的線程先執行,可是根據虛擬機的實現不一樣,也有多是隨機執行。
  • join()方法可讓調用它的線程正常執行完成後,再去執行該線程後面的代碼,它具備讓線程排隊的做用。

二 線程同步

線程安全,一般所說的線程安全指的是相對的線程安全,它指的是對這個對象單獨的操做是線程安全的,咱們在調用的時候無需作額外的保障措施。

什麼叫相對安全?🤔

🤞舉個栗子

咱們知道Java裏的Vector是個線程安全的類,在多線程環境下對其插入、刪除和讀取都是安全的,但這僅限於每次只有一個線程對其操做,若是多個線程同時操做 Vector,那它就再也不是線程安全的了。

final Vector<String> vector = new Vector<>();

    while (true) {
        for (int i = 0; i < 10; i++) {
            vector.add("項:" + i);
        }

        Thread removeThread = new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < vector.size(); i++) {
                    vector.remove(i);
                }
            }
        });

        Thread printThread = new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < vector.size(); i++) {
                    Log.d(TAG, vector.get(i));
                }
            }
        });

        removeThread.start();
        printThread.start();

        if (Thread.activeCount() >= 20) {
            return;
        }
    }
複製代碼

可是程序卻crash了

正確的作法應該是vector對象加上同步鎖,以下:

final Vector<String> vector = new Vector<>();

        while (true) {
            for (int i = 0; i < 10; i++) {
                vector.add("項:" + i);
            }

            Thread removeThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    synchronized (vector){
                        for (int i = 0; i < vector.size(); i++) {
                            vector.remove(i);
                        }
                    }
                }
            });

            Thread printThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    synchronized (vector){
                        for (int i = 0; i < vector.size(); i++) {
                            Log.d(TAG, vector.get(i));
                        }
                    }
                }
            });

            removeThread.start();
            printThread.start();

            if (Thread.activeCount() >= 20) {
                return;
            }
        }
複製代碼

2.1 volatile

volatile也是互斥同步的一種實現,不過它很是的輕量級。

volatile有兩條關鍵的語義:

  • 保證被volatile修飾的變量對全部線程都是可見的
  • 禁止進行指令重排序

要理解volatile關鍵字,咱們得先從Java的線程模型開始提及。如圖所示:

Java內存模型規定了全部字段(這些字段包括實例字段、靜態字段等,不包括局部變量、方法參數等,由於這些是線程私有的,並不存在競爭)都存在主內存中,每一個線程會 有本身的工做內存,工做內存裏保存了線程所使用到的變量在主內存裏的副本拷貝,線程對變量的操做只能在工做內存裏進行,而不能直接讀寫主內存,固然不一樣內存之間也 沒法直接訪問對方的工做內存,也就是說主內存時線程傳值的媒介。

咱們來理解第一句話:

保證被volatile修飾的變量對全部線程都是可見的

如何保證可見性?🤔

被volatile修飾的變量在工做內存修改後會被強制寫回主內存,其餘線程在使用時也會強制從主內存刷新,這樣就保證了一致性。

關於「保證被volatile修飾的變量對全部線程都是可見的」,有種常見的錯誤理解:

錯誤理解:因爲volatile修飾的變量在各個線程裏都是一致的,因此基於volatile變量的運算在多線程併發的狀況下是安全的。

這句話的前半部分是對的,後半部分卻錯了,所以它忘記考慮變量的操做是否具備原子性這一問題。

:point_up:舉個栗子

private volatile int start = 0;

    private void volatileKeyword() {

        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 10; i++) {
                    start++;
                }
            }
        };

        for (int i = 0; i < 10; i++) {
            Thread thread = new Thread(runnable);
            thread.start();
        }
        Log.d(TAG, "start = " + start);
    }

複製代碼

這段代碼啓動了10個線程,每次10次自增,按道理最終結果應該是100,可是結果並不是如此。

爲何會這樣?:thinking:

仔細看一下start++,它其實並不是一個原子操做,簡單來看,它有兩步:

  1. 取出start的值,由於有volatile的修飾,這時候的值是正確的。
  2. 自增,可是自增的時候,別的線程可能已經把start加大了,這種狀況下就有可能把較小的start寫回主內存中。

因此volatile只能保證可見性,在不符合如下場景下咱們依然須要經過加鎖來保證原子性:

  • 運算結果並不依賴變量當前的值,或者只有單一線程修改變量的值。(要麼結果不依賴當前值,要麼操做是原子性的,要麼只要一個線程修改變量的值)
  • 變量不須要與其餘狀態變量共同參與不變約束

比方說咱們會在線程里加個boolean變量,來判斷線程是否中止,這種狀況就很是適合使用volatile。

咱們再來理解第二句話。

  • 禁止進行指令重排序

什麼是指令重排序?🤔

指令重排序是值指令亂序執行,即在條件容許的狀況下,直接運行當前有能力當即執行的後續指令,避開爲獲取下一條指令所需數據而形成的等待,經過亂序執行的技術,提供執行效率。

指令重排序繪製被volatile修飾的變量的賦值操做前,添加一個內存屏障,指令重排序時不能把後面的指令重排序的內存屏障以前的位置。

關於指令重排序不是本篇文章重點討論的內容,更多細節能夠參考指令重排序

2.2 synchronized

synchronized是互斥同步的一種實現。

synchronized:當某個線程訪問被synchronized標記的方法或代碼塊時,這個線程便得到了該對象的鎖,其餘線程暫時沒法訪問這個方法,只有等待這個方法執行完畢或者代碼塊執行完畢,這個 線程纔會釋放該對象的鎖,其餘線程才能執行這個方法或代碼塊。

前面咱們已經說了volatile關鍵字,這裏咱們舉個例子來綜合分析volatile與synchronized關鍵字的使用。

:point_up:舉個栗子

public class Singleton {

    //volatile保證了:1 instance在多線程併發的可見性 2 禁止instance在操做是的指令重排序
    private volatile static Singleton instance;

    public static Singleton getInstance() {
        //第一次判空,保證沒必要要的同步
        if (instance == null) {
            //synchronized對Singleton加全局所,保證每次只要一個線程建立實例
            synchronized (Singleton.class) {
                //第二次判空時爲了在null的狀況下建立實例
                if (instance == null) {
                    instance = new Singleton();
                }
            }
        }
        return instance;
    }
}
複製代碼

這是一個經典的DSL單例。

它的字節碼以下:

能夠看到被synchronized同步的代碼塊,會在先後分別加上monitorenter和monitorexit,這兩個字節碼都須要指定加鎖和解鎖的對象。

關於加鎖和解鎖的對象:

  • synchronized代碼塊 :同步代碼塊,做用範圍是整個代碼塊,做用對象是調用這個代碼塊的對象。
  • synchronized方法 :同步方法,做用範圍是整個方法,做用對象是調用這個方法的對象。
  • synchronized靜態方法 :同步靜態方法,做用範圍是整個靜態方法,做用對象是調用這個類的全部對象。
  • synchronized(this):做用範圍是該對象中全部被synchronized標記的變量、方法或代碼塊,做用對象是對象自己。
  • synchronized(ClassName.class) :做用範圍是靜態的方法或者靜態變量,做用對象是Class對象。

synchronized(this)添加的是對象鎖,synchronized(ClassName.class)添加的是類鎖,它們的區別以下:

對象鎖:Java的全部對象都含有1個互斥鎖,這個鎖由JVM自動獲取和釋放。線程進入synchronized方法的時候獲取該對象的鎖,固然若是已經有線程獲取了這個對象的鎖,那麼當前線 程會等待;synchronized方法正常返回或者拋異常而終止,JVM會自動釋放對象鎖。這裏也體現了用synchronized來加鎖的好處,方法拋異常的時候,鎖仍然能夠由JVM來自動釋放。

類鎖:對象鎖是用來控制實例方法之間的同步,類鎖是用來控制靜態方法(或靜態變量互斥體)之間的同步。其實類鎖只是一個概念上的東西,並非真實存在的,它只是用來幫助咱們理 解鎖定實例方法和靜態方法的區別的。咱們都知道,java類可能會有不少個對象,可是隻有1個Class對象,也就是說類的不一樣實例之間共享該類的Class對象。Class對象其實也僅僅是1個 java對象,只不過有點特殊而已。因爲每一個java對象都有1個互斥鎖,而類的靜態方法是須要Class對象。因此所謂的類鎖,不過是Class對象的鎖而已。獲取類的Class對象有好幾種,最簡 單的就是MyClass.class的方式。 類鎖和對象鎖不是同一個東西,一個是類的Class對象的鎖,一個是類的實例的鎖。也就是說:一個線程訪問靜態synchronized的時候,容許另外一個線程訪 問對象的實例synchronized方法。反過來也是成立的,由於他們須要的鎖是不一樣的。

關不一樣步鎖還有ReentrantLock,eentrantLockR相對於synchronized具備等待可中斷、公平鎖等更多功能,這裏限於篇幅,再也不展開。

三 線程池

咱們知道線程的建立、切換與銷燬都會花費比較大代價,因此很天然的咱們使用線程池來複用和管理線程。Java裏的線程池咱們一般經過ThreadPoolExecutor來實現。 接下來咱們就來分析ThreadPoolExecutor的相關原理,以及ThreadPoolExecutor在Android上的應用AsyncTask。

3.1 線程池調度

線程池有五種運行狀態,以下所示:

線程池狀態圖

  • RUNNING:能夠接受新任務,也能夠處理等待隊列裏的任務。
  • SHUTDOWN:不接受新任務,但能夠處理等待隊列裏的任務。
  • STOP:不接受新的任務,再也不處理等待隊列裏的任務。中斷正在處理的任務。
  • TIDYING:全部任務都已經處理完了,當前線程池沒有有效的線程,而且即將調用terminated()方法。
  • TERMINATED:調用了terminated()方法,線程池終止。

另外,ThreadPoolExecutor是用一個AtomicInteger來記錄線程池狀態和線程池裏的線程數量的,以下所示:

  • 低29位:用來存放線程數
  • 高3位:用來存放線程池狀態
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;// 111
private static final int SHUTDOWN   =  0 << COUNT_BITS;// 000
private static final int STOP       =  1 << COUNT_BITS;// 001
private static final int TIDYING    =  2 << COUNT_BITS;// 010
private static final int TERMINATED =  3 << COUNT_BITS;// 110

// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }//線程池狀態
private static int workerCountOf(int c) { return c & CAPACITY; }//線程池當前線程數
private static int ctlOf(int rs, int wc) { return rs | wc; }
複製代碼

在正式介紹線程池調度原理以前,咱們先來回憶一下Java實現任務的兩個接口:

  • Runnable:在run()方法裏完成任務,無返回值,且不會拋出異常。
  • Callable:在call()方法裏完成任務,有返回值,且可能拋出異常。

另外,還有個Future接口,它能夠對Runnable、Callable執行的任務進行判斷任務是否完成,中斷任務以及獲取任務結果的操做。咱們一般會使用它的實現類FutureTask,FutureTask是一個Future、Runnable 以及Callable的包裝類。利用它能夠很方便的完成Future接口定義的操做。FutureTask內部的線程阻塞是基於LockSupport來實現的。

咱們接下來看看線程池是和執行任務的。

ThreadPoolExecutor調度流程圖

execute(Runnable command)

public class ThreadPoolExecutor extends AbstractExecutorService {
        public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            int c = ctl.get();
            //1. 若線程池狀態是RUNNING,線程池大小小於配置的核心線程數,則能夠在線程池中建立新線程執行新任務。
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            //2. 若線程池狀態是RUNNING,線程池大小大於配置的核心線程數,則嘗試將任務插入阻塞隊列進行等待
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                //若插入成功,則將次檢查線程池的狀態是否爲RUNNING,若是不是則移除當前任務並進入拒絕策略。
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                //若是線程池中的線程數爲0,即線程池中的線程都執行完畢處於SHUTDOWN狀態,此時添加了一個null任務
                //(由於SHUTDOWN狀態再也不接受新任務)
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            //3. 若沒法插入阻塞隊列,則嘗試建立新線程,建立失敗則進入拒絕策略。
            else if (!addWorker(command, false))
                reject(command);
        }
}
複製代碼
  1. 若線程池大小小於配置的核心線程數,則能夠在線程池中建立新線程執行新任務。
  2. 若線程池狀態是RUNNING,線程池大小大於配置的核心線程數,則嘗試將任務插入阻塞隊列進行等待。若插入成功,爲了健壯性考慮,則將次檢查線程池的狀態是否爲RUNNING ,若是不是則移除當前任務並進入拒絕策略。若是線程池中的線程數爲0,即線程池中的線程都執行完畢處於SHUTDOWN狀態,此時添加了一個null任務(由於SHUTDOWN狀態再也不接受 新任務)。
  3. 若沒法插入阻塞隊列,則嘗試建立新線程,建立失敗則進入拒絕策略。

這個其實很好理解,打個比方。咱們公司的一個小組來完成任務,

  • 若是任務數量小於小組人數(核心線程數),則指派小組裏人的完成;
  • 若是任務數量大於小組人數,則去招聘新人來完成,則將任務加入排期等待(阻塞隊列)。
  • 若是沒有排期,則試着去招新人來完成任務(最大線程數),若是招新人也完成不了,說明這不是人乾的活,則去找產品經理砍需求(拒絕策略)。

addWorker(Runnable firstTask, boolean core)

addWorker(Runnable firstTask, boolean core) 表示添加個Worker,Worker實現了Runnable接口,是對Thread的封裝,該方法添加完Worker後,則調用runWorker()來啓動線程。

public class ThreadPoolExecutor extends AbstractExecutorService {
    
     private boolean addWorker(Runnable firstTask, boolean core) {
            //重試標籤
            retry:
            for (;;) {
                int c = ctl.get();
                //獲取當前線程池狀態
                int rs = runStateOf(c);
    
                //如下狀況表示再也不接受新任務:1 線程池沒有處於RUNNING狀態 2 要執行的任務爲空 3 阻塞隊列已滿
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
    
                for (;;) {
                    //獲取線程池當前的線程數
                    int wc = workerCountOf(c);
                    //若是超出容量,則再也不接受新任務,core表示是否使用corePoolSize做爲比較標準
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    //增長線程數
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    //若是線程池狀態發生變化,從新開始循環
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
    
            //線程數增長成功,開始添加新線程,Worker是Thread的封裝類
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    //加鎖
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        int rs = runStateOf(ctl.get());
    
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            //將新啓動的線程添加到線程池中
                            workers.add(w);
                            //更新線程池中線程的數量,注意這個數量不能超過largestPoolSize
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                        //調用runWorker()方法,開始執行線程
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }
}
複製代碼

runWorker(Worker w)

runWorker()方法是整個阻塞隊列的核心循環,在這個循環中,線程池會不斷的從阻塞隊列workerQueue中取出的新的task並執行。

public class ThreadPoolExecutor extends AbstractExecutorService {
    
    final void runWorker(Worker w) {
           Thread wt = Thread.currentThread();
           Runnable task = w.firstTask;
           w.firstTask = null;
           w.unlock(); // allow interrupts
           boolean completedAbruptly = true;
           try {
               //從阻塞隊列中不斷取出任務,若是取出的任務爲空,則循環終止
               while (task != null || (task = getTask()) != null) {
                   w.lock();
                   // If pool is stopping, ensure thread is interrupted;
                   // if not, ensure thread is not interrupted. This
                   // requires a recheck in second case to deal with
                   // shutdownNow race while clearing interrupt
                   if ((runStateAtLeast(ctl.get(), STOP) ||
                        (Thread.interrupted() &&
                         runStateAtLeast(ctl.get(), STOP))) &&
                       !wt.isInterrupted())
                       wt.interrupt();
                   try {
                       //該方法爲空,能夠從新次方法,在任務執行開始前作一些處理
                       beforeExecute(wt, task);
                       Throwable thrown = null;
                       try {
                           task.run();
                       } catch (RuntimeException x) {
                           thrown = x; throw x;
                       } catch (Error x) {
                           thrown = x; throw x;
                       } catch (Throwable x) {
                           thrown = x; throw new Error(x);
                       } finally {
                           //該方法爲空,能夠從新次方法,在任務執行結束後作一些處理
                           afterExecute(task, thrown);
                       }
                   } finally {
                       task = null;
                       w.completedTasks++;
                       w.unlock();
                   }
               }
               completedAbruptly = false;
           } finally {
               processWorkerExit(w, completedAbruptly);
           }
       }
       
        //從阻塞隊列workerQueue中取出Task
        private Runnable getTask() {
               boolean timedOut = false; // Did the last poll() time out?
               //循環
               for (;;) {
                   int c = ctl.get();
                   //獲取線程池狀態
                   int rs = runStateOf(c);
       
                   //如下狀況中止循環:1 線程池狀態不是RUNNING(>= SHUTDOWN)2 線程池狀態>= STOP 或者阻塞隊列爲空
                   if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                       //遞減workCount
                       decrementWorkerCount();
                       return null;
                   }
       
                   int wc = workerCountOf(c);
       
                   // 判斷線程的IDLE超時機制是否生效,有兩種狀況:1 allowCoreThreadTimeOut = true,這是能夠手動
                   //設置的 2 當前線程數大於核心線程數
                   boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
       
                   if ((wc > maximumPoolSize || (timed && timedOut))
                       && (wc > 1 || workQueue.isEmpty())) {
                       if (compareAndDecrementWorkerCount(c))
                           return null;
                       continue;
                   }
       
                   try {
                       //根據timed來決定是以poll超時等待的方式仍是以take()阻塞等待的方式從阻塞隊列中獲取任務
                       Runnable r = timed ?
                           workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                           workQueue.take();
                       if (r != null)
                           return r;
                       timedOut = true;
                   } catch (InterruptedException retry) {
                       timedOut = false;
                   }
               }
           }
}
複製代碼

因此你能夠理解了,runWorker()方法是在新建立線程的run()方法裏的,而runWorker()又不斷的調用getTask()方法去獲取阻塞隊列裏的任務,這樣就實現了線程的複用。

3.2 線程池配置

咱們先來看看ThreadPoolExecutor的構造方法:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) 複製代碼
  • int corePoolSize:核心線程池大小
  • int maximumPoolSize:線程池最大容量大小
  • long keepAliveTime:線程不活動時存活的時間
  • TimeUnit unit:時間單位
  • BlockingQueue workQueue:任務隊列
  • ThreadFactory threadFactory:線程工程
  • RejectedExecutionHandler handler:線程拒絕策略

那麼這些參數咱們應該怎麼配置呢?要合理配置線程池就須要先了解咱們的任務特性,通常說來:

  • 任務性質:CPU密集型、IO密集型、混合型
  • 任務優先級:低、中、高
  • 任務執行時間:短、中、長
  • 任務依賴性:是否依賴其餘資源,數據庫、網絡

咱們根據這些屬性來一一分析這些參數的配置。

首先就是核心線程數corePoolSize與最大線程數maximumPoolSize。這個的配置咱們一般要考慮CPU同時執行線程的閾值。一旦超過這個閾值,CPU就須要花費不少 時間來完成線程的切換與調度,這樣會致使性能大幅下滑。

/** * CPU核心數,注意該方法並不可靠,它返回的有可能不是真實的CPU核心數,由於CPU在某些狀況下會對某些核 * 心進行睡眠處理,這種狀況返回的知識已激活的CPU核心數。 */
private static final int NUMBER_OF_CPU = Runtime.getRuntime().availableProcessors();

/** * 核心線程數 */
private static final int corePoolSize = Math.max(2, Math.min(NUMBER_OF_CPU - 1, 4));

/** * 最大線程數 */
private static final int maximumPoolSize = NUMBER_OF_CPU * 2 + 1;
複製代碼

至於keepAliveTime,該參數描述了線程不活動時存活的時間,若是是CPU密集型任務,則將時間設置的小一些,若是是IO密集型或者數據庫鏈接任務,則將時間設置的長一些。

咱們再來看看BlockingQueue參數的配置。BlockingQueue用來描述阻塞隊列。它的方法以四種形式存在,以此來知足不一樣需求。

拋出異常 特殊值 阻塞 超時
add(e) offer(e) put(e) offer(e, time, unit)
remove() poll() take() poll(time, unit)
element() peek() 不可用 不可用

它有如下特色:

  • 不支持null元素
  • 線程安全

它的實現類有:

  • ArrayBlockingQueue :一個數組實現的有界阻塞隊列,此隊列按照FIFO的原則對元素進行排序,支持公平訪問隊列(可重入鎖實現ReenttrantLock)。
  • LinkedBlockingQueue :一個由鏈表結構組成的有界阻塞隊列,此隊列默認和最大長度爲Integer.MAX_VALUE,按照FIFO的原則對元素進行排序。
  • PriorityBlockingQueue :一個支持優先級排序的無界阻塞隊列,默認狀況下采用天然順序排列,也能夠指定Comparator。
  • DelayQueue:一個支持延時獲取元素的無界阻塞隊列,建立元素時能夠指定多久之後才能從隊列中獲取當前元素,經常使用於緩存系統設計與定時任務調度等。
  • SynchronousQueue:一個不存儲元素的阻塞隊列。存入操做必須等待獲取操做,反之亦然,它至關於一個傳球手,很是適合傳遞性場景。
  • LinkedTransferQueue:一個由鏈表結構組成的無界阻塞隊列,與LinkedBlockingQueue相比多了transfer和tryTranfer方法,該方法在有消費者等待接收元素時會當即將元素傳遞給消費者。
  • LinkedBlockingDeque:一個由鏈表結構組成的雙端阻塞隊列,能夠從隊列的兩端插入和刪除元素。由於出入口都有兩個,能夠減小一半的競爭。適用於工做竊取的場景。

工做竊取:例若有兩個隊列A、B,各自幹本身的活,可是A效率比較高,很快把本身的活幹完了,因而勤快的A就會去竊取B的任務來幹,這是A、B會訪問同一個隊列,爲了減小A、B的競爭,規定竊取者A 只從雙端隊列的尾部拿任務,被竊取者B只從雙端隊列的頭部拿任務。

咱們最後來看看RejectedExecutionHandler參數的配置。

RejectedExecutionHandler用來描述線程數大於或等於線程池最大線程數時的拒絕策略,它的實現類有:

  • ThreadPoolExecutor.AbortPolicy:默認策略,當線程池中線程的數量大於或者等於最大線程數時,拋出RejectedExecutionException異常。
  • ThreadPoolExecutor.DiscardPolicy:當線程池中線程的數量大於或者等於最大線程數時,默默丟棄掉不能執行的新任務,不報任何異常。
  • ThreadPoolExecutor.CallerRunsPolicy:當線程池中線程的數量大於或者等於最大線程數時,若是線程池沒有被關閉,則直接在調用者的線程裏執行該任務。
  • ThreadPoolExecutor.DiscardOldestPolicy:當線程池中線程的數量大於或者等於最大線程數時,丟棄阻塞隊列頭部的任務(即等待最近的任務),而後從新添加當前任務。

另外,Executors提供了一系列工廠方法用來建立線程池。這些線程是適用於不一樣的場景。

  • newCachedThreadPool():無界可自動回收線程池,查看線程池中有沒有之前創建的線程,若是有則複用,若是沒有則創建一個新的線程加入池中,池中的線程超過60s不活動則自動終止。適用於生命 週期比較短的異步任務。
  • newFixedThreadPool(int nThreads):固定大小線程池,與newCachedThreadPool()相似,可是池中持有固定數目的線程,不能隨時建立線程,若是建立新線程時,超過了固定 線程數,則放在隊列裏等待,直到池中的某個線程被移除時,才加入池中。適用於很穩定、很正規的併發線程,多用於服務器。
  • newScheduledThreadPool(int corePoolSize):週期任務線程池,該線程池的線程能夠按照delay依次執行線程,也能夠週期執行。
  • newSingleThreadExecutor():單例線程池,任意時間內池中只有一個線程。

3.3 線程池監控

ThreadPoolExecutor裏提供了一些空方法,咱們能夠經過繼承ThreadPoolExecutor,複寫這些方法來實現對線程池的監控。

public class ThreadPoolExecutor extends AbstractExecutorService {
       
    protected void beforeExecute(Thread t, Runnable r) { }
    protected void afterExecute(Runnable r, Throwable t) { }
}
複製代碼

常見的監控指標有:

  • taskCount:線程池須要執行的任務數量。
  • completedTaskCount:線程池在運行過程當中已完成的任務數量,小於或等於taskCount。
  • largestPoolSize:線程池裏曾經建立過的最大線程數量。經過這個數據能夠知道線程池是否曾經滿過。如該數值等於線程池的最大大小,則表示線程池曾經滿過。
  • getPoolSize:線程池的線程數量。若是線程池不銷燬的話,線程池裏的線程不會自動銷燬,因此這個大小隻增不減。
  • getActiveCount:獲取活動的線程數。

四 線程池應用

4.1 AsyncTask

AsyncTask基於ThreadPoolExecutor實現,內部封裝了Thread+Handler,多用來執行耗時較短的任務。

一個簡單的AsyncTask例子

public class AsyncTaskDemo extends AsyncTask<String, Integer, String> {

    /** * 在後臺任務開始執行以前調用,用於執行一些界面初始化操做,例如顯示一個對話框,UI線程。 */
    @Override
    protected void onPreExecute() {
        super.onPreExecute();
    }

    /** * 執行後臺線程,執行完成能夠經過return語句返回,worker線程 * * @param strings params * @return result */
    @Override
    protected String doInBackground(String... strings) {
        return null;
    }

    /** * 更新進度,UI線程 * * @param values progress */
    @Override
    protected void onProgressUpdate(Integer... values) {
        super.onProgressUpdate(values);
    }


    /** * 後臺任務執行完成並經過return語句返回後會調用該方法,UI線程。 * * @param result result */
    @Override
    protected void onPostExecute(String result) {
        super.onPostExecute(result);
    }

    /** * 後臺任務唄取消後回調 * * @param reason reason */
    @Override
    protected void onCancelled(String reason) {
        super.onCancelled(reason);
    }

    /** * 後臺任務唄取消後回調 */
    @Override
    protected void onCancelled() {
        super.onCancelled();
    }
}
複製代碼

AsyncTask的使用很是的簡單,接下來咱們去分析AsyncTask的源碼實現。

AsyncTask流程圖

AsyncTask源碼的一開始就是個建立線程池的流程。

public abstract class AsyncTask<Params, Progress, Result> {
    
        private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
        //核心線程數,最少2個,最多4個
        private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
        private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
        //線程不活動時的存活時間是30s
        private static final int KEEP_ALIVE_SECONDS = 30;
    
        //線程構建工廠,指定線程的名字
        private static final ThreadFactory sThreadFactory = new ThreadFactory() {
            private final AtomicInteger mCount = new AtomicInteger(1);
    
            public Thread newThread(Runnable r) {
                return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
            }
        };
    
        //一個由鏈表結構組成的無界阻塞隊列
        private static final BlockingQueue<Runnable> sPoolWorkQueue =
                new LinkedBlockingQueue<Runnable>(128);
    
        public static final Executor THREAD_POOL_EXECUTOR;
    
        //構建線程池
        static {
            ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                    CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
                    sPoolWorkQueue, sThreadFactory);
            threadPoolExecutor.allowCoreThreadTimeOut(true);
            THREAD_POOL_EXECUTOR = threadPoolExecutor;
        }
}
複製代碼

另外,咱們能夠經過AsyncTask.executeOnExecutor(Executor exec, Params... params) 來自定義線程池。

咱們再來看看構造方法。

public abstract class AsyncTask<Params, Progress, Result> {
    
      //構造方法須要在UI線程裏調用
      public AsyncTask() {
          //建立一個Callable對象,WorkerRunnable實現了Callable接口
          mWorker = new WorkerRunnable<Params, Result>() {
              public Result call() throws Exception {
                  mTaskInvoked.set(true);
  
                  Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
                  //noinspection unchecked
                  Result result = doInBackground(mParams);
                  Binder.flushPendingCommands();
                  return postResult(result);
              }
          };
  
          //建立一個FutureTask對象,該對象用來接收mWorker的結果
          mFuture = new FutureTask<Result>(mWorker) {
              @Override
              protected void done() {
                  try {
                      //將執行的結果經過發送給Handler處理,注意FutureTask的get()方法會阻塞直至結果返回
                      postResultIfNotInvoked(get());
                  } catch (InterruptedException e) {
                      android.util.Log.w(LOG_TAG, e);
                  } catch (ExecutionException e) {
                      throw new RuntimeException("An error occurred while executing doInBackground()",
                              e.getCause());
                  } catch (CancellationException e) {
                      postResultIfNotInvoked(null);
                  }
              }
          };
      } 
      
      private void postResultIfNotInvoked(Result result) {
          final boolean wasTaskInvoked = mTaskInvoked.get();
          if (!wasTaskInvoked) {
              postResult(result);
          }
      }
  
      private Result postResult(Result result) {
          @SuppressWarnings("unchecked")
          Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
                  new AsyncTaskResult<Result>(this, result));
          message.sendToTarget();
          return result;
      }
      
     //內部的Handler 
     private static class InternalHandler extends Handler {
        public InternalHandler() {
            //UI線程的Looper
            super(Looper.getMainLooper());
        }

        @SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})
        @Override
        public void handleMessage(Message msg) {
            AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
            switch (msg.what) {
                //返回結果
                case MESSAGE_POST_RESULT:
                    // There is only one result
                    result.mTask.finish(result.mData[0]);
                    break;
                //返回進度
                case MESSAGE_POST_PROGRESS:
                    result.mTask.onProgressUpdate(result.mData);
                    break;
            }
        }
     }
}
複製代碼

能夠看到當咱們調用AsyncTask的構造方法時,就建立了一個FutureTask對象,它內部包裝了Callable對象(就是咱們要執行的任務),並在FutureTask對象的done()方法裏 將結果發送給Handler。

接着看看執行方法execute()。

public abstract class AsyncTask<Params, Progress, Result> {
    
        //須要在UI線程裏調用
        @MainThread
        public final AsyncTask<Params, Progress, Result> execute(Params... params) {
            return executeOnExecutor(sDefaultExecutor, params);
        }

        @MainThread
        public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec, Params... params) {
            if (mStatus != Status.PENDING) {
                switch (mStatus) {
                    case RUNNING:
                        throw new IllegalStateException("Cannot execute task:"
                                + " the task is already running.");
                    case FINISHED:
                        throw new IllegalStateException("Cannot execute task:"
                                + " the task has already been executed "
                                + "(a task can be executed only once)");
                }
            }
    
            mStatus = Status.RUNNING;
            //任務執行前的處理,咱們能夠複寫次方法
            onPreExecute();
    
            mWorker.mParams = params;
            //執行任務,exec爲sDefaultExecutor
            exec.execute(mFuture);
    
            return this;
        }
}
複製代碼

接着看看這個sDefaultExecutor。

能夠看到sDefaultExecutor是個SerialExecutor對象,SerialExecutor實現了Executor接口。

public abstract class AsyncTask<Params, Progress, Result> {
    
        public static final Executor SERIAL_EXECUTOR = new SerialExecutor();
        private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;
        
        private static class SerialExecutor implements Executor {
            //任務隊列
            final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
            //當前執行的任務
            Runnable mActive;
    
            public synchronized void execute(final Runnable r) {
                
                mTasks.offer(new Runnable() {
                    public void run() {
                        try {
                            r.run();
                        } finally {
                            scheduleNext();
                        }
                    }
                });
                if (mActive == null) {
                    //開始執行任務
                    scheduleNext();
                }
            }
    
            protected synchronized void scheduleNext() {
                //取出隊列頭的任務開始執行
                if ((mActive = mTasks.poll()) != null) {
                    THREAD_POOL_EXECUTOR.execute(mActive);
                }
            }
        }
}
複製代碼

因此咱們沒調用一次AsyncTask.execute()方法就將FutureTask對象添加到隊列尾部,而後會從隊列頭部取出任務放入線程池中執行,因此你能夠看着這是一個串行執行器。

4.2 Okhttp

在Okhttp的任務調度器Dispatcher裏有關於線程池的配置

public final class Dispatcher {
    
      public synchronized ExecutorService executorService() {
        if (executorService == null) {
          executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
              new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
        }
        return executorService;
      }
}
複製代碼

你能夠看到它的配置:

  • 核心線程數爲0,最大線程數爲Integer.MAX_VALUE,不對核心線程數進行限制,隨時建立新的線程,空閒存活時間爲60s,用完即走。這也比較符合網絡請求的特性。
  • 阻塞隊列爲SynchronousQueue,該隊列不存儲任務,只傳遞任務,因此把任務添加進去就會執行。

這實際上是Excutors.newCachedThreadPool()緩存池的實現。總結來講就是新任務過來進入SynchronousQueue,它是一個單工模式的隊列,只傳遞任務,不存儲任務,而後就建立 新線程執行任務,線程不活動的存活時間爲60s。

Okhttp請求流程圖

在發起網絡請求時,每一個請求執行完成後都會調用client.dispatcher().finished(this)。

final class RealCall implements Call {
    
  final class AsyncCall extends NamedRunnable {
    private final Callback responseCallback;

    AsyncCall(Callback responseCallback) {
      super("OkHttp %s", redactedUrl());
      this.responseCallback = responseCallback;
    }

    String host() {
      return originalRequest.url().host();
    }

    Request request() {
      return originalRequest;
    }

    RealCall get() {
      return RealCall.this;
    }

    @Override protected void execute() {
      boolean signalledCallback = false;
      try {
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        //異步請求
        client.dispatcher().finished(this);
      }
    }
  }
}
複製代碼

咱們來看看client.dispatcher().finished(this)這個方法。

public final class Dispatcher {
    
  private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    int runningCallsCount;
    Runnable idleCallback;
    synchronized (this) {
      //將已經結束的請求call移除正在運行的隊列calls
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      //異步請求promoteCalls爲true
      if (promoteCalls) promoteCalls();
      runningCallsCount = runningCallsCount();
      idleCallback = this.idleCallback;
    }

    if (runningCallsCount == 0 && idleCallback != null) {
      idleCallback.run();
    }
  }

    private void promoteCalls() {
      //當前異步請求數大於最大請求數,不繼續執行
      if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
      //異步等待隊列爲空,不繼續執行
      if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
  
      //遍歷異步等待隊列
      for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
        AsyncCall call = i.next();
  
        //若是沒有超過相同host的最大請求數,則複用當前請求的線程
        if (runningCallsForHost(call) < maxRequestsPerHost) {
          i.remove();
          runningAsyncCalls.add(call);
          executorService().execute(call);
        }
  
        //運行隊列達到上限,也再也不執行
        if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
      }
    }
}
複製代碼

因此你能夠看到Okhttp不是用線程池來控制線程個數,線程池裏的線程執行的都是正在運行請請求,控制線程的是Dispatcher,Dispatcher.promoteCalls()方法經過 最大請求數maxRequests和相同host最大請求數maxRequestsPerHost來控制異步請求不超過兩個最大值,在值範圍內不斷的將等待隊列readyAsyncCalls中的請求添加 到運行隊列runningAsyncCalls中去。

相關文章
相關標籤/搜索