Java線程池源碼分析與總結(下)

使用ThreadPoolExecutor直接建立線程池

  • 使用有界隊列和有限數量的線程數會保證安全

線程池的狀態

  • 在關閉線程池章節中,查看源碼實際上會發現線程池有許多狀態:java

    /** * The main pool control state, ctl, is an atomic integer packing * two conceptual fields * workerCount, indicating the effective number of threads * runState, indicating whether running, shutting down etc * * In order to pack them into one int, we limit workerCount to * (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2 * billion) otherwise representable. If this is ever an issue in * the future, the variable can be changed to be an AtomicLong, * and the shift/mask constants below adjusted. But until the need * arises, this code is a bit faster and simpler using an int. * * The workerCount is the number of workers that have been * permitted to start and not permitted to stop. The value may be * transiently different from the actual number of live threads, * for example when a ThreadFactory fails to create a thread when * asked, and when exiting threads are still performing * bookkeeping before terminating. The user-visible pool size is * reported as the current size of the workers set. * * The runState provides the main lifecycle control, taking on values: * * RUNNING: Accept new tasks and process queued tasks * SHUTDOWN: Don't accept new tasks, but process queued tasks * STOP: Don't accept new tasks, don't process queued tasks, * and interrupt in-progress tasks * TIDYING: All tasks have terminated, workerCount is zero, * the thread transitioning to state TIDYING * will run the terminated() hook method * TERMINATED: terminated() has completed * * The numerical order among these values matters, to allow * ordered comparisons. The runState monotonically increases over * time, but need not hit each state. The transitions are: * * RUNNING -> SHUTDOWN * On invocation of shutdown(), perhaps implicitly in finalize() * (RUNNING or SHUTDOWN) -> STOP * On invocation of shutdownNow() * SHUTDOWN -> TIDYING * When both queue and pool are empty * STOP -> TIDYING * When pool is empty * TIDYING -> TERMINATED * When the terminated() hook method has completed * * Threads waiting in awaitTermination() will return when the * state reaches TERMINATED. * * Detecting the transition from SHUTDOWN to TIDYING is less * straightforward than you'd like because the queue may become * empty after non-empty and vice versa during SHUTDOWN state, but * we can only terminate if, after seeing that it is empty, we see * that workerCount is 0 (which sometimes entails a recheck -- see * below). */
    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
    複製代碼
    • 將註釋翻譯下來就是對着幾個線程池狀態的具體描述:git

      • 其中五個狀態程序員

        RUNNING:接收新的任務,處理隊列中的任務;github

        SHUTDOWN:不接收新的任務,但處理隊列中的任務編程

        STOP:不接收新的任務,不處理隊列中的任務,中斷正在執行的任務數組

        TIDYING:全部任務都終止,有效線程數爲0線程過分到TIDYING時會調用terminated鉤子方法緩存

        TERMINATED:terminated()方法執行完畢後進入該狀態;安全

      • 狀態之間的轉換markdown

        RUNNING -> SHUTDOWN:調用shutdown方法;網絡

        (RUNNING or SHUTDOWN) -> STOP:調用shutdownNow方法;

        SHUTDOWN -> TIDYING:當線程池和任務隊列都爲空(隊列中沒有未執行的任務了,而且全部線程都完成了工做處於賦閒狀態);

        STOP -> TIDYING:當線程池中工做線程數量爲0(其實就是變爲stop狀態,對全部正在執行任務的線程執行中斷,也再也不處理隊列中未處理的任務,一旦中斷所有完成,全部工做線程數量就爲0了,直接進入tidying狀態,也無論隊列中的任務了);

        TIDYING -> TERMINATED:當terminated方法執行完畢;

      • 狀態轉換示意圖

        image-20210419143355740
  • 線程池狀態是由命名爲ctl的AtomicIntegr的成員變量持有的(共32位),包含如下兩個信息:

    • 線程池狀態-最高3位

    • 線程池中線程數量-低29位

      // 初始化線程池狀態-RUNNING 0工做線程
      private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
      // 設置位數 高3位與低29位分別表示線程池狀態與線程池工做線程數量
      private static final int COUNT_BITS = Integer.SIZE - 3;
      // 線程的最大數量大概是5億多
      private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
      
      // runState is stored in the high-order bits
      private static final int RUNNING    = -1 << COUNT_BITS;  // 111
      private static final int SHUTDOWN   =  0 << COUNT_BITS;  // 000
      private static final int STOP       =  1 << COUNT_BITS;  // 001
      private static final int TIDYING    =  2 << COUNT_BITS;  // 010
      private static final int TERMINATED =  3 << COUNT_BITS;  // 011
      
      // Packing and unpacking ctl
      // 根據ctl獲取線程池狀態
      private static int runStateOf(int c) { return c & ~CAPACITY; }
      // 根據ctl獲取線程池中工做線程數量
      private static int workerCountOf(int c) { return c & CAPACITY; }
      // 使用runstate與workercount組裝ctl,初始狀態下rs 爲RUNNING wc爲0
      private static int ctlOf(int rs, int wc) { return rs | wc; }
      
      private static boolean runStateLessThan(int c, int s) {
        return c < s;
      }
      
      private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
      }
      // 注意大小比較關係
      private static boolean isRunning(int c) {
        return c < SHUTDOWN;
      }
      複製代碼
      • 事實上Integer.MAX_VALUE大於CAPACITY,因此能夠說實際上下邊的Executors中的構造函數中的最大線程池數量是根本沒法達到的
      • 爲何要把線程池數量和線程池狀態維護在一個變量中?
        • 由於事實上是須要維護線程池內有效線程數量和線程池狀態的一致性的(源碼中實際上許多地方是同時判斷線程池狀態與線程池內有效線程的數量的),若是兩者分開維護會由於維護兩者的一致性而浪費鎖資源;
        • 而且能夠經過位運算去分別計算獲得線程池狀態與工做線程數量,效率也是很高的

ThreadPoolExecutor的總體架構與運行流程

image-20210419133922120

  • 線程池在內部實際上構建了一個生產者消費者模型,將線程和任務二者解耦,並不直接關聯,從而良好的緩衝任務,複用線程。線程池的運行主要分紅兩部分:任務管理、線程管理。任務管理部分充當生產者的角色,當任務提交後,線程池會判斷該任務後續的流轉:(1)直接申請線程執行該任務;(2)緩衝到隊列中等待線程執行;(3)拒絕該任務。線程管理部分是消費者,它們被統一維護在線程池內,根據任務請求進行線程的分配,當線程執行完任務後則會繼續獲取新的任務去執行,最終當線程獲取不到任務的時候,線程就會被回收

Executor框架內的核心類--ThreadPoolExecutor

  • 學習ThreadPoolExecutor構造方法(針對參數最多的學習)
public ThreadPoolExecutor(int corePoolSize, ​ int maximumPoolSize, ​ long keepAliveTime, ​ TimeUnit unit, ​ BlockingQueue<Runnable> workQueue, ​ ThreadFactory threadFactory, ​ RejectedExecutionHandler handler) {

​        if (corePoolSize < 0 ||

​            maximumPoolSize <= 0 ||

​            maximumPoolSize < corePoolSize ||

​            keepAliveTime < 0)

​            throw new IllegalArgumentException();

​        if (workQueue == null || threadFactory == null || handler == null)

​            throw new NullPointerException();

​        this.acc = System.getSecurityManager() == null ?

​                null :

​                AccessController.getContext();

​        this.corePoolSize = corePoolSize;

​        this.maximumPoolSize = maximumPoolSize;

​        this.workQueue = workQueue;

​        this.keepAliveTime = unit.toNanos(keepAliveTime);

​        this.threadFactory = threadFactory;

​        this.handler = handler;

​    }
複製代碼
  • ThreadPoolExecutor 3 個最重要的參數:

    • corePoolSize : 核心線程數定義了最小能夠同時運行的線程數量,所謂最小的含義就是,這些線程建立後即使沒有任務執行,是空閒的,也要維持運行,除非設置了allowCoreThreadTimeOut,若是設置爲true,則在超過keepAliveTime以後,空閒的核心線程也會被回收
      • 注意這裏會有一個誤解須要澄清:並非線程池啓動起來後就當即維護起corePoolSize個線程,也是按需求來的,最早分配任務的corePoolSize自動成爲核心線程
    • maximumPoolSize : 當隊列中存放的任務達到隊列容量的時候,當前能夠同時運行的線程數量變爲最大線程數。(注意是能夠同時運行的最大的線程數量,不是當前正在運行的線程數量
    • workQueue: 當新任務來的時候會先判斷當前運行的線程數量是否達到核心線程數,若是達到的話,新任務就會被存放在隊列中。拿隊列當緩存,直到隊列放滿報錯或者有線程空閒推出隊列
      • 須要注意的是這個隊列存儲的僅僅是execute方法提交的Runnable任務,而不是其餘的什麼複雜的結構
  • ThreadPoolExecutor其餘常見參數:

    • keepAliveTime:當線程池中的線程數量大於 corePoolSize 的時候,若是這時沒有新的任務提交,核心線程外的線程不會當即銷燬(注意這裏針對的是核心線程外的線程,核心線程也就是最小維持數量的線程會一直維持運行),而是會等待,直到等待的時間超過了 keepAliveTime纔會被回收銷燬;
      • 其起做用的機制在於getTask函數中調用的阻塞隊列的poll函數的超時設置
    • unit : keepAliveTime 參數的時間單位。
    • threadFactory :executor 建立新線程的時候會用到。
    • handler :飽和策略。關於飽和策略下面單獨介紹一下。當提交的任務過多而不能及時處理(這裏的及時處理指的是任務隊列已經滿了,而且線程池已經達到了容許的最大線程量,而且都在工做)時,咱們能夠定製策略來處理任務
  • 飽和策略(對應的是任務拒絕模塊)

    • 定義:飽和就是當任務隊列滿了,而且線程池當前同時運行的線程數量已經達到設定的最大值時的狀態,更準確地定義應該是任務拒絕策略,而不只僅是飽和策略,由於線程池飽和的時候會執行拒絕,線程池狀態不是running狀態時,也要對新提交的任務執行拒絕策略

    • 任務的拒絕是經過reject函數完成的, 默認提供4個拒絕策略,固然也能夠實現本身的拒絕策略

      final void reject(Runnable command) {
        handler.rejectedExecution(command, this);
      }
      複製代碼
    • ThreadPoolExecutor.AbortPolicy:拋出 RejectedExecutionException來拒絕新任務的處理,此policy也是使用Executors工具類建立線程池以及咱們不指定飽和策略使用ThreadPoolExecutor構造函數時的默認的飽和策略

      • 若是是比較關鍵的業務,推薦使用此拒絕策略,這樣在系統不能承載更多任務時,及時經過異常發現
    • ThreadPoolExecutor.CallerRunsPolicy:調用執行本身的線程運行任務,也就是直接在調用execute方法的線程(通常是主線程)中運行(run)被拒絕的任務,若是執行程序(線程池)已關閉,則會丟棄該任務。所以這種策略會下降對於新任務提交速度,影響程序的總體性能(由於Main線程去處理新提交的任務去了,就沒法處理新的請求了)。若是您的應用程序能夠承受此延遲而且你要求任何一個任務請求都要被執行的話,你能夠選擇這個策略

      • 此策略適合處理大量計算任務的任務類型
        • 多線程僅僅是增大吞吐量的手段,最終必需要讓每一個任務都執行完畢
      • 此策略能夠作分流,請求能夠分別緩存在線程池工做隊列->工做線程(通常是main線程)->TCP 層->客戶端 達成了必定程度的可伸縮功能
    • ThreadPoolExecutor.DiscardPolicy: 不處理新任務,直接丟棄掉。(直接丟棄掉,甚至不會拋出異常

    • ThreadPoolExecutor.DiscardOldestPolicy: 此策略將丟棄最先的未處理的任務請求(所謂的拋棄最先的未處理的任務請求,就是拋棄下一個待處理的任務,處於頭部的任務),丟棄後再次嘗試提交新的任務

    • 對於以上幾種飽和策略的理解補充:

      • 無論是哪一種策略,都會(除了DiscardPolicy策略)判斷線程是否是shutdown狀態,若是是會直接忽略新的任務
  • 任務隊列(線程安全的阻塞隊列)

    • 若是execute是線程池的任務調度器,是做爲整個線程池的入口的話,那麼任務緩衝隊列能夠看做是整個線程池的神經中樞,做爲神經中樞,其起到了解耦任務與線程的操做,之因此有這一層緩衝,纔有了所謂的調度---實際上是一個生產者、消費者模式,其中生產者(通常是建立線程池的線程,main線程)投聽任務,消費者(線程池中的工做線程)取任務去執行
    • 顯然是一個隊列模型,構造函數要求的任務隊列類型是BlockingQueue<Runnable>,可是這個是一個接口類,真正可使用的實現類有以下幾種(具體的描述看本身總結的線程安全的容器這個文章):
      • LinkedBlockingQueue 無界隊列,所謂的無界隊列就是隊列長度默認設置爲Integr.MAX_VALUE,使用工具類建立線程池時,newFixedThreadPool與SingleThreadExecutor都是使用的此類型的隊列
      • ArrayBlockingQueue 有界隊列,通常本身使用的就是此隊列,數組實現,一旦建立大小不可更改
      • DelayedWorkQueue 定時任務的線程池使用的是此隊列,按照任務的下一次執行的時間的遲早進行排序
      • SynchronousQueue 直接提交,也就是不把任務存儲,而知直接提交給線程,newCachedTHreadPool使用的就是這個
      • 除此以外還有多種任務隊列可供使用,包括優先級隊列,雙端隊列等等
  • 線程工廠

    • 在建立線程池的時候,還能夠傳入一個重要的參數就是線程工廠,默認狀況下的線程池建立線程的過程都是其內部的DefaultThreadFactory,可是若是要用自定義的方式建立線程,以實現對於線程池建立的線程的監控與控制的話,就須要用到這個線程工廠的參數
execute方法--任務調度
  • 任務調度是整個線程池的入口,是整個線程池的核心所在,而這個任務調度對應的實際上就是execute方法

    • execute做爲任務調度方法的大體運做流程是根據線程池的運行狀態,工做線程的數量與運行策略來決定新提交的任務的三種可能的去向:
      • 直接申請線程執行
      • 緩衝到阻塞隊列中
      • 使用配置好的拒絕策略,拒絕任務的執行
  • 線程池中最重要的方法必定是任務的提交執行方法,又因爲submit內部實際調用了execute方法,因此直接查看ThreadPoolExecutor的execute方法

    // 存放線程池的運行狀態 (runState) 和線程池內有效線程的數量 (workerCount)
    
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    
    
    // 獲取當前線程池內正在運行的線程數量
    private static int workerCountOf(int c) {
    
    ​    return c & CAPACITY;
    
    }
    
    
    // 任務隊列
    private final BlockingQueue<Runnable> workQueue;
    
    
    
    public void execute(Runnable command) {
    
    ​    // 首先確定是檢查的任務的有效性,若是爲null就要報空指針異常if (command == null)
    
    ​            throw new NullPointerException();
    
    ​        /* ​ \* Proceed in 3 steps: ​ * ​ \* 1. If fewer than corePoolSize threads are running, try to ​ \* start a new thread with the given command as its first ​ \* task. The call to addWorker atomically checks runState and ​ \* workerCount, and so prevents false alarms that would add ​ \* threads when it shouldn't, by returning false. ​ * ​ \* 2. If a task can be successfully queued, then we still need ​ \* to double-check whether we should have added a thread ​ \* (because existing ones died since last checking) or that ​ \* the pool shut down since entry into this method. So we ​ \* recheck state and if necessary roll back the enqueuing if ​ \* stopped, or start a new thread if there are none. ​ * ​ \* 3. If we cannot queue task, then we try to add a new ​ \* thread. If it fails, we know we are shut down or saturated ​ \* and so reject the task. ​ */// 檢查完提交的任務的有效性以後就要執行如上英文註釋的三個步驟的處理了// 得到線程池的狀態與當前運行的線程的數量的記錄(ThreadPoolExecutor類中定義了五種線程池狀態)
      			 // ctl更像是一個線程池的運行時上下文的狀態維護變量int c = ctl.get();
    
    ​        // 1.首先判斷當前線程池中之行的任務數量是否小於 corePoolSize// 若是小於的話,經過addWorker(command, true)新建一個線程,並將任務(command)添加到該線程中;而後,啓動該線程從而執行任務。if (workerCountOf(c) < corePoolSize) {
    
    ​            if (addWorker(command, true))
    
    ​                return;
    						// addWorker失敗,從新得到線程池狀態,以進行下一步判斷
    ​            c = ctl.get();
    
    ​        }
    
    ​        // 2.若是當前之行的線程數量大於等於 corePoolSize 或者addWorker失敗(失敗的緣由多是有效線程的數量已經大於corePoolSize,因此須要緩存任務)後就會走到這裏// 經過 isRunning 方法判斷線程池狀態,線程池處於 RUNNING 狀態纔會將任務加入到任務隊列中,而且判斷是否能加入到任務隊列中if (isRunning(c) && workQueue.offer(command)) {
    
    ​            // 成功將任務添加到任務隊列中// 再次檢查線程池中的線程狀態,並再次檢查線程池中是否有可用的線程,由於自從上一次檢查後,可能有線程已經完成了工做或者線程池已經shutdown了int recheck = ctl.get();
    
    ​            // 若是線程池狀態不是running狀態,就要從任務隊列中移除任務,至關於一次回滾,並執行構造函數中參數指定的飽和策略if (! isRunning(recheck) && remove(command))
    								// 執行回滾
    ​                reject(command);
    
    ​            // 若是當前線程池是running狀態而且工做線程爲0(以前運行的工做線程被回收了,corePoolSize也有可能被回收)就新建立一個線程,其中worker的初始任務爲null
    						// else if (workerCountOf(recheck) == 0)
    
    ​                addWorker(null, false);
    
    ​        }
    
    ​        // 3. 任務隊列已經滿了,或者線程池已經不是running狀態了 所以作最後的嘗試,在建立一個新的線程試試,若是建立失敗,表示線程池已經滿了,所以執行飽和策略else if (!addWorker(command, false))
    
    ​            reject(command);
    
    }
    複製代碼
    • 事實上,源代碼的註釋上就已經說的很清楚了。

      • 文字說明
        • 首先要保證的是,線程池狀態必須是RUNNING狀態纔會接受新的任務,若是不是RUNNING狀態,會直接執行拒絕策略reject函數
        • 若是當前線程池中的有效線程池數量小於corePoolSize說明還能夠無腦添加新的線程並使其執行新的任務(這也說明corePoolSize數量的線程也是懶建立的,不是默認就自動維護這麼多數量的線程
        • 若是大於的話,就要嘗試將新的任務緩存到任務隊列中,若是任務隊列沒有滿就加到任務隊列,若是滿了,沒辦法只能繼續擴張線程數量了,此時判斷當前的有效線程數量是否小於maximumPoolSize若是小於,則建立新的線程並用來執行新的任務;若是有效線程數量已經大於maximumPoolSize,只能去執行拒絕策略了
      • 不要忽略源碼中的double-check機制,在成功將任務加到緩存隊列中後,要double-check線程池的狀態是否仍是RUNNING狀態,若是不是及時回滾提交的任務,執行拒絕策略,除此以外還要檢查工做線程是否已經所有被回收,若是所有被回收,須要建立一個空的Worker以備用,不能讓線程池爲空(若是線程池一直爲空的話,下一個任務至關於純建立一個線程,沒法發揮線程池的性能優點了)
線程管理與任務的獲取
  • 線程池爲了方便的掌握線程的狀態與維護線程的週期,設計了工做線程對象WorkerWorker起做用的最關鍵的就是實現了Runnable的接口(使得Worker能夠做爲線程任務被執行,至關於將提交的任務作了包裝)和繼承了AQS類(控制線程的中斷,維護線程的生命週期)

    private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
      final Thread thread;
      Runnable firstTask;
      // ...
    }
    複製代碼
    • Worker中最重要的兩個成員變量

      • thread,在Worker的構造函數中被建立,使用的是ThreadPoolExecutor建立時傳入的threadFactory去執行線程的構建

      • firstTask,firstTask用來保存傳入的第一個任務,這個任務能夠有也能夠爲null。若是這個值是非空的,那麼線程就會在啓動初期當即執行這個任務,也就對應核心線程建立時的狀況,執行完firstTask後再去隊列中取後續的任務若是這個值是null,那麼就須要建立一個線程去執行任務列表(workQueue)中的任務,也就是非核心線程的建立

        • 上邊的描述針對的是通常的線程池,對於定時任務線程池來講,添加核心線程時,firstTask也是爲null
        image-20210423201541482
  • addWorker函數(任務調度時建立核心線程執行任務或者建立非核心線程)

    // 全局鎖,併發操做必備
    private final ReentrantLock mainLock = new ReentrantLock();
    // 跟蹤線程池的最大大小,應該只有在持有全局鎖mainLock的前提下才訪問此屬性
    private int largestPoolSize;
    // 工做線程集合,存放線程池中全部的(活躍的)工做線程,只有在持有全局鎖mainLock的前提下才能訪問此集合
    private final HashSet<Worker> workers = new HashSet<>();
    //獲取線程池狀態
    private static int runStateOf(int c) { return c & ~CAPACITY; }
    //判斷線程池的狀態是否爲 Running
    private static boolean isRunning(int c) {
      return c < SHUTDOWN;
    }
    
    // 返回true表示建立並啓動線程成功
    // firstaTask就是這個線程的初始任務
    // 第二個參數爲true表示新的worker也就是工做線程是嘗試加到corePool中仍是maximumPool中
    
    private boolean addWorker(Runnable firstTask, boolean core) {
            // retry標誌位經常使用於多循環嵌套的流程控制
            retry:
            for (;;) {
              	// 獲取線程池狀態
                int c = ctl.get();
                int rs = runStateOf(c);
    
                
                // 若是狀態>= SHUTDOWN 表示線程池是正在關閉(SHUTDOWN)或者已經關閉(>SHUTDOWN)狀態的處置方法:
                // 若是此時是shutdown狀態,而且沒有分配初始任務,而且任務隊列不爲空,則是容許建立新的worker的(此時新建立的worker用來在SHUTDOWN狀態下,執行任務隊列中剩餘的任務),違反任一則是不容許的,好比線程池已經關閉(>SHUTDOWN)或者是SHUTDOWN狀態,可是附加了本身的初始任務,是不容許的,只能執行隊列中剩餘的任務,或者隊列已經爲空了,再也不須要新的worker了,也會建立失敗
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
    
                for (;;) {
                 		// 獲取線程池中的線程數量
                    int wc = workerCountOf(c);
                    // 判斷當前線程池的數量與哪一個值比較,若是已經達到最終的最大值CAPACITY,當即返回false
                    // 不然根據參數判斷與哪一個值比較,是最小值仍是最大值比較,若是目標是建立核心線程,就和corePoolSize比較,若是已經達到設計大小了,就建立失敗,若是目標是建立非核心線程,就和maximumPoolSize比較
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    // 上一步判斷後還能夠添加worker,就使用CAS增長worker計數
                    if (compareAndIncrementWorkerCount(c))
                        // 跳出整個循環
                        break retry;
                    // CAS 失敗
                    c = ctl.get();
                    // 若是線程池狀態發生了改變,
                    if (runStateOf(c) != rs)
                        // 從頭開始執行整個外部的for循環,從新根據線程池狀態進行判決
                        continue retry;
                    // 省略的else表示的就是CAS失敗的緣由是線程數量被同步修改了,只須要從新執行內部的for循環,根據線程數量進行判決便可
                }
            }
      
      
      		  // 線程數量成功更新,
    				// 初始化工做線程啓動成功標誌
            boolean workerStarted = false;
      			// 初始化工做線程建立成功標誌
            boolean workerAdded = false;
            Worker w = null;
            try {
                // 建立worker實例
                w = new Worker(firstTask);
                // 得到worker持有的線程實例
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                   // 加鎖
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        // 獲取線程池狀態
                        int rs = runStateOf(ctl.get());
    										// < shutdown也就是running狀態下執行操做 
                        // 或者是在SHUTDOWN狀態下,而且firstTask爲空時執行下述操做
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            // 判斷線程是否已經啓動了,若已經啓動了,拋出異常
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                          	// 將新建立的worker實例添加到worker集合中,是一個HashSet集合
                            workers.add(w);
                           // 更新largestPoolSize
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                           // 設置worker添加成功標誌位
                            workerAdded = true;
                        }
                    } finally {
                        // 釋放鎖
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                        // 啓動worker
                        t.start();
                        // 設置worker啓動成功標誌位
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                   // worker啓動失敗,要執行回滾
                   // 從工做線程集合中移除新添加的Worker實例
                   // 線程池狀態中線程池數量-1
                   // tryTerminate
                  addWorkerFailed(w);
            }
            // 返回worker是否啓動的狀態
            return workerStarted;
      }
    複製代碼
    • addWorker源碼中,也能看見線程池狀態與線程池數量共同決定流程走向的場景,這就是爲何要把這兩個狀態維護在一個變量中的緣由
    • 線程池經過維護workers這個集合來維護線程不被回收,當須要回收時,只須要將其引用消除,也就是將Worker對象消除便可,jvm會完成後續的回收(詳見線程回收小節)
    • 流程圖
    image-20210426092911659
  • runWorker函數,worker開始執行任務

    /* * 如何addWorker中啓動線程的語句t.start()轉到了runWorker方法呢? */
    
    // Worker的線程實例是在Worker構造函數中完成初始化的,注意,傳入newThread的是this,也就是Worker實例自己被當作一個Runnable任務提交到了線程中,因此調用線程實例的start方法時,就會執行Runnable任務也就是Worker實例的run方法
    
    Worker(Runnable firstTask) {
      setState(-1); // 將AQS計數設置爲-1,目的是爲了在worker初始化致使runWorker被執行的期間內不被中斷
      this.firstTask = firstTask;
      // thread成員變量由默認的或者指定的線程工廠建立,傳入的Runnable參數是Worker實例自己
      this.thread = getThreadFactory().newThread(this);
    }
    
    // 新線程啓動後執行此方法
    public void run() {
      runWorker(this);
    }
    
    
    
    // addWorker: 
    // 建立worker實例
    w = new Worker(firstTask);
    final Thread t = w.thread;
    
    // 開啓新線程後執行Runnbale參數的run方法,也就是Worker實例的run方法
    t.start()
      
    /* * 上述代碼是addWorker中的啓動線程的代碼 * 下邊的代碼是runWorker中的代碼 */
    
      
    // 實際在新線程中執行的方法
    final void runWorker(Worker w) {
          Thread wt = Thread.currentThread();
          // 首先保存初始任務。再清空初始任務
          Runnable task = w.firstTask;
          w.firstTask = null;
          w.unlock(); // worker 容許中斷,此時worker的狀態是空閒狀態,能夠被回收(中斷)
          
          // 初始化線程異常退出標誌位
          boolean completedAbruptly = true;
          try {
            // 執行初始化任務,或者while循環不斷的阻塞以從任務隊列得到新的任務,除非getTask返回null,表示已經沒法得到任務,須要執行線程回收
            while (task != null || (task = getTask()) != null) {
              // 成功的獲取了任務
              // 開始執行任務,不容許中斷,此時線程是非空閒狀態
              w.lock();
              
              // 執行recheck 若是線程池已經關閉了,而且當前線程尚未中斷,就要執行對當前線程的中斷,不然要保證當前線程不是中斷狀態
              if ((runStateAtLeast(ctl.get(), STOP) ||
                   (Thread.interrupted() &&
                    runStateAtLeast(ctl.get(), STOP))) &&
                  !wt.isInterrupted())
                wt.interrupt();
              try {
                // 鉤子函數,默認的鉤子函數的函數體爲空,能夠去構造ThreadPoolExecutor的子類去複寫此鉤子函數
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                  // 執行任務
                  task.run();
                } catch (RuntimeException x) {
                  thrown = x; throw x;
                } catch (Error x) {
                  thrown = x; throw x;
                } catch (Throwable x) {
                  thrown = x; throw new Error(x);
                } finally {
                  // 鉤子函數與beforeExecute同理
                  afterExecute(task, thrown);
                }
              } finally {
                task = null;
                // worker的完成的任務數量加1,注意此時是線程安全的
                w.completedTasks++;
                // 釋放鎖
                w.unlock();
              }
            }
            // 線程不是由於異常退出的,而是由於沒法得到任務致使退出的
            completedAbruptly = false;
          } finally {
            // while循環已沒法經過getTask得到新的任務了,具體的緣由參考後續的getTask方法
            // 執行線程回收
            // 若是是由於task執行時出現異常,completedAbruptly爲true,不然爲false
            processWorkerExit(w, completedAbruptly);
          }
        }
    複製代碼
    • 如何addWorker中啓動線程的語句t.start()轉到了runWorker方法呢?這一塊比較繞,能夠直接看代碼的註釋
      • 這裏就體現了Worker類實現Runnable接口的做用,就是將本身包裝爲可執行任務
    • 總的來講,Worker啓動後的命運就是孜孜不倦的執行任務與啓動任務,這就是打工人?
    • 致使出現線程回收的緣由有兩個
      • 任務執行過程當中出現異常
      • 沒法從隊列中獲取新的任務,具體緣由參考geTask方法
  • getTask方法,也就是從隊列中獲取任務的方法也是很重要的,主要功能是核心線程獲取任務或保持阻塞,非核心線程獲取任務,或超時返回null,進而線程生命週期結束

    private Runnable getTask() {
      			// 獲取任務超時的標誌位
            boolean timedOut = false;
    
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
               
                //線程池狀態是STOP以後的狀態,表示已經不處理任務了,或者是SHUTDOWN時,任務隊列已經爲空,想處理也沒的處理了,就直接返回null,worker會被直接回收
                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                    // 工做線程數量-1
                    decrementWorkerCount();
                    return null;
                }
    
                int wc = workerCountOf(c);
    
              //是否超時控制,allowCoreThreadTimeOut默認false,表明不對核心線程作超時限制,對於超出核心線程的線程須要控制超時
                boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
              
              
    					//當線程數大於最大線程數,即線程池已經滿了,或者須要作超時控制且上次獲取任務就已經超時這兩個任一的條件下
              //且線程數大於1或者隊列爲空,嘗試將線程數減一併返回null
                if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
                    if (compareAndDecrementWorkerCount(c))
                        return null;
                    // 失敗重試,從新根據線程池狀態與線程池中線程數量作判斷
                    continue;
                }
    
                try {
                  //當須要超時控制時,在keepAliveTime時間內沒有獲取到任務的話會設置超時標誌位,若是沒有超時限制,則調用take獲取任務,此時線程是阻塞等待獲取任務的
                    Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                    if (r != null)
                        return r;
                    timedOut = true;
                } catch (InterruptedException retry) {
                    // 阻塞等待獲取任務時,整個worker並無加鎖,也就是被認爲是空閒狀態,可能會被回收掉
                    timedOut = false;
                }
            }
        }
    複製代碼
    • 這裏須要補充的就是任務隊列的poll與take方法雖然名稱差別比較大,可是惟一的差別在於前者是加了超時時間,後者是阻塞

      image-20210422141443980

    • getTask這部分進行了屢次判斷,爲的是控制線程的數量,使其符合線程池的狀態。若是線程池如今不該該持有那麼多線程,則會返回null值。工做線程Worker會不斷接收新任務去執行,而當工做線程Worker接收不到任務的時候,就會開始被回收

      • 好比在下邊這段代碼中

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
          if (compareAndDecrementWorkerCount(c))
            return null;
          // 失敗重試,從新根據線程池狀態與線程池中線程數量作判斷
          continue;
        }
        複製代碼
        • 當前線程池內的線程數量超過最大值會進行線程回收
        • 存在超時設置,而且上一次獲取任務已經超時時,若是任務隊列還有很多任務,線程數量又剛好只有一個,是不會對當前這個獨苗進行回收的,而是再試試
    • decrementWorkerCountcompareAndDecrementWorkerCount兩者的區別是什麼

      • decrementWorkerCount內部是在循環調用compareAndDecrementWorkerCount,換句話說就是,必需要嘗試將工做線程數量-1,由於確實不須要此線程了,而compareAndDecrementWorkerCount直接拿來用,只是嘗試一次將工做線程-1,若是失敗的話,就要從新根據狀態作出可能與以前不一樣的判斷
  • 線程回收,processWorkerExit

    實際上,關於線程回收,是有兩種場景的:1. 主動的線程回收,好比processWorkerExit函數這樣的 2. 探查式的回收,或者說是被動的回收,好比interruptIdleWorkers

    主動回收:在runWorker函數中,若是沒法再得到任務,就會跳出執行此線程回收函數,實際上線程池中線程的回收依賴的是JVM的自動回收,線程池要作的只是把線程的引用消除而已

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
            // 一個標誌位,是不是由於發生線程異常,因此進入的此方法
            if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
                // 工做線程數-1
                decrementWorkerCount();
            final ReentrantLock mainLock = this.mainLock;
            // 加鎖,由於要進行審計計數了
            mainLock.lock();
            try {
                // 統計此worker的完成的任務數目
                completedTaskCount += w.completedTasks;
             		// 從線程池中移除此線程
                // 執行remove方法完畢後,實際上已經完成了線程的回收,可是因爲引發線程銷燬的可能性有不少,線程池還要判斷是什麼引起了此次銷燬,是否要改變線程池的現階段狀態,是否要根據新狀態,從新分配線程-----即所謂的線程狀態自適應的過程
                workers.remove(w);
            } finally {
                mainLock.unlock();
            }
    				
      			// 嘗試中斷、回收空閒線程
            tryTerminate();
    
            int c = ctl.get();
            if (runStateLessThan(c, STOP)) {
                if (!completedAbruptly) {
                    // 線程池狀態是RUNNING或SHUTDOWN狀態而且並不是由於異常致使線程關閉的狀況下
                    int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                    if (min == 0 && ! workQueue.isEmpty())
                        min = 1;
                    // 若是線程夠用,就直接返回,不然還要添加一個worker到線程池
                    if (workerCountOf(c) >= min)
                        return; // replacement not needed
                }
              // 若是由於線程異常致使的線程關閉的話,還須要再向線程池中補充一個worker
              // 或者是此時線程數量不能知足最小要求時也要再添加一個worker
                addWorker(null, false);
            }
        }
    複製代碼

    被動回收:上述代碼中提到的tryTerminate方法,也就是在某worker結束生命週期後判斷線程池是否要關閉以及回收空閒線程,以便有效的管理線程池的生命週期,在全部可能致使線程池終止的地方都調用了此方法

    final void tryTerminate() {
            for (;;) {
                int c = ctl.get();
                //當線程池狀態是RUNNING(狀態正常)或者已經TIDYING或者已經TERMINATED(線程已經快關閉了)或者SHUTDOWN且還有任務沒有被執行(SHUTDOWN狀態須要處理完隊列中的任務),直接返回
                if (isRunning(c) ||
                    runStateAtLeast(c, TIDYING) ||
                    (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                    return;
              // 當前線程池狀態是STOP狀態或是SHUTDOWN狀態但任務列表爲空時,若是線程數量不爲0,須要最多終止1個空閒的線程,上邊所述的stop狀態或者shutdown狀態而且queue爲空統稱爲終止流程開始的狀態
              // 若是線程數不爲0,則中斷一個阻塞等待任務的空閒的工做線程
                if (workerCountOf(c) != 0) { 
                    // 嘗試中斷最多一個阻塞等待任務的空閒的工做線程
                    interruptIdleWorkers(ONLY_ONE);
                    return;
                }
    						// 若是當前工做線程數量爲0就準備關閉線程池
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                  // 嘗試設置線程池狀態爲tidying狀態
                    if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                        try {
                            // 若是設置成功調用此鉤子方法
                            terminated();
                        } finally {
                            // 鉤子方法執行完畢後,設置狀態爲TERMINATED,並設置線程數量爲0
                            ctl.set(ctlOf(TERMINATED, 0));
                            // 通知調用awaitTermination的主線程,已經進入了TERMINATION狀態
                            termination.signalAll();
                        }
                        return;
                    }
                } finally {
                    mainLock.unlock();
                }
                // CAS失敗的話,就從新根據狀態進行判斷
            }
        }
    複製代碼
    • 調用了tryTerminate方法的地方有

      • addWorkerFailed
      • processWorkerExit
      • shutdown
      • shutdownNow
      • remove從隊列中移除某任務
      • purge從隊列中移除全部被取消的任務
    • 在被動回收過程當中,最重要的就是能瞭解線程的當前狀態,在主動回收中尚且能夠知道線程是須要回收的,可是被動回收時實際上並不清楚線程池中線程的狀態,Worker經過繼承AQS,使用AQS來實現不可重入的獨佔鎖(使用AQS的獨佔模式)這個功能

      • 沒有使用可重入鎖ReentrantLock,而是使用AQS,爲的就是實現不可重入的特性去反應線程如今的執行狀態
      • lock方法一旦獲取了獨佔鎖,表示當前線程正在執行任務中,若是正在執行任務,則不該該中斷線程
      • 若是該線程如今不是獨佔鎖的狀態,也就是空閒的狀態,說明它沒有在處理任務,這時能夠對該線程進行中斷
      image-20210427175959020
    • interruptIdleWorkers,中斷空閒線程,使其再也不阻塞等待任務

      private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
          for (Worker w : workers) {
            Thread t = w.thread;
            // 判斷線程是否已經被中斷,是的話就什麼都不作
            // 若未被中斷,還要嘗試獲取worker的鎖,此時若是worker若是已經經過lock方法獲取了鎖,則由於其不可重入的特性,致使此處爲false,即對該worker不作任務處理
            // 使用tryLock方法來判斷線程池中的線程是不是空閒狀態
            if (!t.isInterrupted() && w.tryLock()) {
              try {
                // 執行線程中斷
                t.interrupt();
              } catch (SecurityException ignore) {
              } finally {
                // worker釋放鎖
                w.unlock();
              }
            }
            // 若是未true,最多隻會中斷一個空閒線程,也可能一個線程也沒有中斷
            // 若是爲false,則會持續遍歷所有的worker,並嘗試中斷全部的空閒的線程
            if (onlyOne)
              break;
          }
        } finally {
          mainLock.unlock();
        }
      }
      
      
      // shutdownNow函數中調用的中斷全部工做線程的方法
      private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
          for (Worker w : workers)
            // 粗暴的中斷全部線程
            w.interruptIfStarted();
        } finally {
          mainLock.unlock();
        }
      }
      
      // 定義在worker類中,粗暴的打斷全部的已經執行過runWorker方法的worker
      void interruptIfStarted() {
        Thread t;
        // getState() >= 0即state != -1,也就是否是剛初始化的Worker,而是已經運行runWorker的Worker
        // 直接在線程層面執行中斷,而無論worker此時是不是正在運行的狀態(不用去獲取worker的鎖)
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
          try {
            t.interrupt();
          } catch (SecurityException ignore) {
          }
        }
      }
      
      // shutdown中調用了中斷全部空閒線程的方法
      private void interruptIdleWorkers() {
        interruptIdleWorkers(false);
      }
      複製代碼
    • 只有在線程池終止流程開始狀態下(線程池狀態準備轉入TIDYING狀態,可是還有空閒線程的時候),傳入的參數爲true,其他調用都是false,也就是中斷全部的空閒線程

      • 爲何僅在tryTerminate方法中,傳入的參數爲true,也就是最多中斷一個空閒的線程呢?(解釋的不是很清除,本身不是很懂.......)

        • 當前線程池狀態是STOP狀態或是SHUTDOWN狀態但任務列表爲空時,若是線程數量還不爲0,這說明,有多是剩餘的全部線程都是阻塞,而不能傳遞shutdown的指令,在線程池終止流程開始的狀態下,必須最多使一個阻塞在等待獲取任務的線程中斷,才能傳播shutdown信號,以避免全部的線程陷入等待而沒法關閉線程池

        • 中斷一個空閒線程,也能保證在線程池已是SHUTDOWN狀態後,新來的Worker也能最終退出

        • 綜上,爲了保證線程池將來最終可以終止,老是僅中斷一個空閒的工做程序就足夠了,可是shutdown會中斷全部空閒的工做程序,以便多餘的工做程序迅速退出

        • 參考interruptIdleWorkers的註釋

          Interrupts threads that might be waiting for tasks (as indicated by not being locked) so they can check for termination or configuration changes. Ignores SecurityExceptions (in which case some threads may remain uninterrupted). Params: onlyOne – If true, interrupt at most one worker. This is called only from tryTerminate when termination is otherwise enabled but there are still other workers. In this case, at most one waiting worker is interrupted to propagate shutdown signals in case all threads are currently waiting. Interrupting any arbitrary thread ensures that newly arriving workers since shutdown began will also eventually exit. To guarantee eventual termination, it suffices to always interrupt only one idle worker, but shutdown() interrupts all idle workers so that redundant workers exit promptly, not waiting for a straggler task to finish.

  • 從AQS的角度理解Worker的生命週期

    Worker使用的是AQS的獨佔模式,使用獨佔的特性來判斷Worker自己是空閒狀態(未上鎖)仍是工做狀態(上鎖)

    //1. worker初始化
    Worker(Runnable firstTask) {
      setState(-1); // 設置AQS計數標誌爲-1,其目的是爲了防止初始化到runWorker執行這段時間內被中斷
      this.firstTask = firstTask;
      this.thread = getThreadFactory().newThread(this);
    }
    
    // 2. runWorker函數
    final void runWorker(Worker w) {
      Thread wt = Thread.currentThread();
      Runnable task = w.firstTask;
      w.firstTask = null;
      // 至此worker是被以被中斷的,也就是進入了空閒狀態
      w.unlock();
      // ...
      w.lock();
    }
    
    // worker釋放鎖
    public void unlock() { release(1); }
    
    // 獨佔模式下釋放資源
    public final boolean release(int arg) {
      if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
          unparkSuccessor(h);
        return true;
      }
      return false;
    }
    
    protected boolean tryRelease(int unused) {
      // 設置獨佔的線程爲null
      setExclusiveOwnerThread(null);
      // 設置狀態爲0
      setState(0);
      return true;
    }
    
    // worker上鎖
    public void lock() { acquire(1); }
    
    public final void acquire(int arg) {
      if (!tryAcquire(arg) &&
          acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
    }
    
    // worker的實現,其實根本沒有用到參數--1 由於規定就是狀態1爲上鎖的狀態,因此直接用的常量1
    protected boolean tryAcquire(int unused) {
      // 嘗試得到worker的鎖,必須保證鎖狀態的舊狀態是0,才能設置狀態爲1
      if (compareAndSetState(0, 1)) {
        // 設置當前線程爲獨佔線程
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
      }
      return false;
    }
    
    // interruptIdleWorkers函數執行時嘗試中斷空閒的線程,會經過嘗試獲取鎖的方法來判斷線程的狀態
    // 在tryAcquire方法中嘗試設置狀態爲1,可是狀態的當前值應是0(即執行unlock()以後),才能設置成功
    // 這一點也保證了,在Worker初始化設置狀態爲-1到runWorker的狀態設置爲0時,是可以保證不被中斷的
    public boolean tryLock() { return tryAcquire(1); }
    複製代碼

線程池數量大小的肯定

線程池數量的肯定一直是困擾着程序員的一個難題,大部分程序員在設定線程池大小的時候就是隨心而定。

不少人甚至可能都會以爲把線程池配置過大一點比較好!我以爲這明顯是有問題的。就拿咱們生活中很是常見的一例子來講:並非人多就能把事情作好,增長了溝通交流成本。你原本一件事情只須要 3 我的作,你硬是拉來了 6 我的,會提高作事效率嘛?我想並不會。 線程數量過多的影響也是和咱們分配多少人作事情同樣,對於多線程這個場景來講主要是增長了上下文切換成本。不清楚什麼是上下文切換的話,能夠看我下面的介紹。

上下文切換:

多線程編程中通常線程的個數都大於 CPU 核心的個數,而一個 CPU 核心在任意時刻只能被一個線程使用,爲了讓這些線程都能獲得有效執行,CPU 採起的策略是爲每一個線程分配時間片並輪轉的形式。當一個線程的時間片用完的時候就會從新處於就緒狀態讓給其餘線程使用,這個過程就屬於一次上下文切換。歸納來講就是:當前任務在執行完 CPU 時間片切換到另外一個任務以前會先保存本身的狀態,以便下次再切換回這個任務時,能夠再加載這個任務的狀態。任務從保存到再加載的過程就是一次上下文切換

上下文切換一般是計算密集型的。也就是說,它須要至關可觀的處理器時間,在每秒幾十上百次的切換中,每次切換都須要納秒量級的時間。因此,上下文切換對系統來講意味着消耗大量的 CPU 時間,事實上,多是操做系統中時間消耗最大的操做。

Linux 相比與其餘操做系統(包括其餘類 Unix 系統)有不少的優勢,其中有一項就是,其上下文切換和模式切換的時間消耗很是少。

類比於實現世界中的人類經過合做作某件事情,咱們能夠確定的一點是線程池大小設置過大或者太小都會有問題,合適的纔是最好。

若是咱們設置的線程池數量過小的話,若是同一時間有大量任務/請求須要處理,可能會致使大量的請求/任務在任務隊列中排隊等待執行,甚至會出現任務隊列滿了以後任務/請求沒法處理的狀況,或者大量任務堆積在任務隊列致使 OOM。這樣很明顯是有問題的! CPU 根本沒有獲得充分利用。可是,若是咱們設置線程數量太大,大量線程可能會同時在爭取 CPU 資源,這樣會致使大量的上下文切換,從而增長線程的執行時間,影響了總體執行效率。

有一個簡單而且適用面比較廣的公式:

  • CPU 密集型任務(N+1): 這種任務消耗的主要是 CPU 資源,能夠將線程數設置爲 N(CPU 核心數)+1,比 CPU 核心數多出來的一個線程是爲了防止線程偶發的缺頁中斷,或者其它緣由致使的任務暫停而帶來的影響。一旦任務暫停,CPU 就會處於空閒狀態,而在這種狀況下多出來的一個線程就能夠充分利用 CPU 的空閒時間。
  • I/O 密集型任務(2N): 這種任務應用起來,系統會用大部分的時間來處理 I/O 交互,而線程在處理 I/O 的時間段內不會佔用 CPU 來處理,這時就能夠將 CPU 交出給其它線程使用。所以在 I/O 密集型任務的應用中,咱們能夠多配置一些線程,具體的計算方法是 2N。

如何判斷是 CPU 密集任務仍是 IO 密集任務?

CPU 密集型簡單理解就是利用 CPU 計算能力的任務好比你在內存中對大量數據進行排序。單凡涉及到網絡讀取,文件讀取這類都是 IO 密集型,這類任務的特色是 CPU 計算耗費時間相比於等待 IO 操做完成的時間來講不多,大部分時間都花在了等待 IO 操做完成上。

ForkJoin線程池

  • 此線程池用來把大量數據的計算進行拆分(好比一個超大數組的求和),分配給線程池中的多個線程並行去執行,有並行計算那味了

  • Java標準庫提供的java.util.Arrays.parallelSort(array)能夠進行並行排序,它的原理就是內部經過Fork/Join對大數組分拆進行並行排序,在多核CPU上就能夠大大提升排序的速度。還有stream的許多操做底層都用了ForkJoin線程池

參考

  1. Java線程池實現原理及其在美團業務中的實踐--美團技術團隊
  2. Java線程池學習總結
  3. Java線程池
相關文章
相關標籤/搜索