併發編程之Executor線程池原理與源碼解讀

點贊再看,養成習慣,公衆號搜一搜【一角錢技術】關注更多原創技術文章。本文 GitHub org_hejianhui/JavaStudy 已收錄,有個人系列文章。html

前言

在介紹線程池以前,咱們先回顧下線程的基本知識。其中線程池包括ThreadPoolExecutor 默認線程和ScheduledThreadPoolExecutor 定時線程池 ,本篇重點介紹ThreadPoolExecutor線程池。java

線程

線程是調度CPU資源的最小單位,線程模型分爲KLT模型與ULT模型,JVM使用的是KLT模型,Java線程與OS線程保持 1:1 的映射關係,也就是說有一個Java線程也會在操做系統裏有一個對應的線程。git

內核線程模型

內核線程(KLT):系統內核管理線程(KLT),內核保存線程的狀態和上下文信息,線程阻塞不會引發進程阻塞。在多處理器系統上,多線程在多處理器上並行運行。線程的建立、調度和管理由內核完成,效率比ULT要慢,比進程操做快。github

用戶線程模型

用戶線程(ULT):用戶程序實現,不依賴操做系統核心,應用提供建立、同步、調度和管理線程的函數來控制用戶線程。不須要用戶態/內核態切換,速度快。內核對ULT無感知,線程阻塞則進程(包括它的全部線程)阻塞。web

Java線程生命狀態

Java線程有多種生命狀態:編程

  • NEW ,新建
  • RUNNABLE ,運行
  • BLOCKED ,阻塞
  • WAITING ,等待
  • TIMED_WAITING ,超時等待
  • TERMINATED,終結

狀態切換以下圖所示: 數組

Java線程實現方式

Java線程實現方式主要有四種:緩存

  • 繼承Thread類
  • 實現Runnable接口、
  • 實現Callable接口經過FutureTask包裝器來建立Thread線程、
  • 使用ExecutorService、Callable、Future實現有返回結果的多線程。

其中前兩種方式線程執行完後都沒有返回值,後兩種是帶返回值的。服務器

繼承Thread類建立線程

Thread類本質上是實現了Runnable接口的一個實例,表明一個線程的實例。啓動線程的惟一方法就是經過Thread類的start()實例方法。start()方法是一個native方法,它將啓動一個新線程,並執行run()方法。這種方式實現多線程很簡單,經過本身的類直接extend Thread,並複寫run()方法,就能夠啓動新線程並執行本身定義的run()方法。例如:markdown

public class MyThread extends Thread {  
  public void run() {  
   System.out.println("關注一角錢技術,獲取Java架構資料");  
  }  
}  
 
MyThread myThread1 = new MyThread();  
MyThread myThread2 = new MyThread();  
myThread1.start();  
myThread2.start();
複製代碼

實現Runnable接口建立線程

若是本身的類已經extends另外一個類,就沒法直接extends Thread,此時,能夠實現一個Runnable接口,以下:

// 實現Runnable接口的類將被Thread執行,表示一個基本的任務
public interface Runnable {
    // run方法就是它全部的內容,就是實際執行的任務
    public abstract void run();
}
複製代碼
public class MyThread implements Runnable {  
  public void run() {  
  	System.out.println("關注一角錢技術,獲取Java架構資料");  
  }  
}
複製代碼

爲了啓動MyThread,須要首先實例化一個Thread,並傳入本身的MyThread實例:

MyThread myThread = new MyThread();  
Thread thread = new Thread(myThread);  
thread.start();  
複製代碼

事實上,當傳入一個Runnable target參數給Thread後,Thread的run()方法就會調用target.run(),參考JDK源代碼:

public void run() {  
  if (target != null) {  
   target.run();  
  }  
}
複製代碼

實現Callable接口經過FutureTask包裝器來建立Thread線程

Callable接口(也只有一個方法)定義以下:

public interface Callable<V> { 
	V call() throws Exception;   
} 
複製代碼
//Callable一樣是任務,與Runnable接口的區別在於它接收泛型,同時它執行任務後帶有返回內容
public class SomeCallable<V> implements Callable<V> {
	// 相對於run方法的帶有返回值的call方法
    @Override
    public V call() throws Exception {
        // TODO Auto-generated method stub
        return null;
    }

}
複製代碼
Callable<V> oneCallable = new SomeCallable<V>();   
//由Callable<Integer>建立一個FutureTask<Integer>對象: 
FutureTask<V> oneTask = new FutureTask<V>(oneCallable);
//註釋:FutureTask<Integer>是一個包裝器,它經過接受Callable<Integer>來建立,它同時實現了Future和Runnable接口。
//由FutureTask<Integer>建立一個Thread對象: 
Thread oneThread = new Thread(oneTask);   
oneThread.start();   
//至此,一個線程就建立完成了。
複製代碼

使用ExecutorService、Callable、Future實現有返回結果的線程

ExecutorService、Callable、Future三個接口實際上都是屬於Executor框架。返回結果的線程是在JDK1.5中引入的新特徵,有了這種特徵就不須要再爲了獲得返回值而大費周折了。並且本身實現了也可能漏洞百出。(下部分來說線程池了

  • 可返回值的任務必須實現Callable接口。
  • 相似的,無返回值的任務必須實現Runnable接口。

執行Callable任務後,能夠獲取一個Future的對象,在該對象上調用get就能夠獲取到Callable任務返回的Object了。

注意:get方法是阻塞的,即:線程無返回結果,get方法會一直等待。

再結合線程池接口ExecutorService就能夠實現傳說中有返回結果的多線程了。

下面提供了一個完整的有返回結果的多線程測試例子。代碼以下:

package com.niuh.thread.v4;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/** * <p> * 使用ExecutorService、Callable、Future實現有返回結果的線程 * </p> */
public class MyThread {
    
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        System.out.println(("----程序開始運行----"));
        Date date1 = new Date();

        int taskSize = 5;
        // 建立一個線程池
        ExecutorService pool = Executors.newFixedThreadPool(taskSize);
        // 建立多個有返回值的任務
        List<Future> list = new ArrayList<Future>();
        for (int i = 0; i < taskSize; i++) {
            Callable c = new MyCallable(i + " ");
            // 執行任務並獲取Future對象
            Future f = pool.submit(c);
            // System.out.println(">>>" + f.get().toString());
            list.add(f);
        }
        // 關閉線程池
        pool.shutdown();

        // 獲取全部併發任務的運行結果
        for (Future f : list) {
            // 從Future對象上獲取任務的返回值,並輸出到控制檯
            System.out.println(">>>" + f.get().toString());
        }

        Date date2 = new Date();
        System.out.println("----程序結束運行----,程序運行時間【"
                + (date2.getTime() - date1.getTime()) + "毫秒】");
    }
}

class MyCallable implements Callable<Object> {
    private String taskNum;

    MyCallable(String taskNum) {
        this.taskNum = taskNum;
    }

    public Object call() throws Exception {
        System.out.println(">>>" + taskNum + "任務啓動");
        Date dateTmp1 = new Date();
        Thread.sleep(1000);
        Date dateTmp2 = new Date();
        long time = dateTmp2.getTime() - dateTmp1.getTime();
        System.out.println(">>>" + taskNum + "任務終止");
        return taskNum + "任務返回運行結果,當前任務時間【" + time + "毫秒】";
    }
}
複製代碼

協程

協程(纖程,用戶級線程),目的是爲了追求最大力度的發揮硬件性能和提高軟件的速度,協程基本原理是:在某個點掛起當前的任務,而且保存棧信息,去執行另外一個任務;等完成或達到某個條件時,再還原原來的棧信息並繼續執行(整個過程不須要上下文切換)。

協程的概念很早就提出來了,但直到最近幾年纔在某些語言(如Lua)中獲得普遍應用。

協程的目的:當咱們在使用多線程的時候,若是存在長時間的I/O操做。這個時候線程一直處於阻塞狀態,若是線程不少的時候,會存在不少線程處於空閒狀態,形成了資源應用不完全。相對的協程不同了,在單線程中多個任務來回執行若是出現長時間的I/O操做,讓其讓出目前的協程調度,執行下一個任務。固然可能全部任務,所有卡在同一個點上,可是這只是針對於單線程而言,當全部數據正常返回時,會同時處理當前的I/O操做。

Java原生不支持協程,在純java代碼裏須要使用協程的話須要引入第三方包,如:quasar

<dependency>
	<groupId>co.paralleluniverse</groupId>
	<artifactId>quasar-core</artifactId>
	<version>0.8.0</version>
	<classifier>jdk8</classifier>
</dependency>
複製代碼

線程池

「線程池」,顧名思義就是一個線程緩存,線程是稀缺資源,若是被無限制的建立,不只會消耗系統資源,還會下降系統的穩定性,所以 Java 中提供線程池對線程進行統一分配、調優和監控。

線程池介紹

在web開發中,服務器須要接受並處理請求,因此會爲一個請求分配一個線程來進行處理。若是每次請求都建立一個線程的話實現起來很是簡單,可是存在一個問題:若是併發的請求數量很是多,但每一個線程執行的時間很短,這樣就會頻繁的建立和銷燬線程,如此一來會大大下降系統的效率。可能出現服務器在爲每一個請求建立新線程和銷燬線程上花費的時間和消耗的系統資源要比處理實際的用戶請求的時間和資源更多

那麼有沒有一種辦法使執行完一個任務,並不被銷燬,而是能夠繼續執行其餘的任務呢?

這就是線程池的目的。線程池爲線程生命週期的開銷和資源不足問題提供瞭解決方案。經過對多個任務重用線程,線程建立的開銷被分攤到多個任務上。

何時使用線程池?

  • 單個任務處理時間比較短;
  • 須要處理的任務數量很大。

線程池優點

  • 重用存在的線程。減小線程黃金、消亡的開銷,提升性能;
  • 提升響應速度。當任務到達時,任務能夠不須要等待線程建立就能當即執行;
  • 提升線程的可管理性。線程是稀缺資源,若是無限制的建立,不只會消耗系統資源,還會下降系統的穩定性,使用線程池能夠進行統一的分配、調優和監控。

Executor框架

Executor接口是線程池框架中最基礎的部分,定義來一個用於執行 Runnable 的 execute 方法。下面爲它的繼承與實現

ExecutorService接口

從圖中能夠看出 Executor 下有一個重要的子接口 ExecutorService ,其中定義來線程池的具體行爲

  • execute(Runnable command):履行Ruannable類型的任務;
  • submit(task):可用來提交Callable或Runnable任務,並返回表明此任務的Future對象;
  • shutdown():在完成已提交的任務後封閉辦事,再也不接管新任務;
  • shutdownNow():中止全部正在履行的任務並封閉辦事;
  • isTerminated():測試是否全部任務都履行完畢了;
  • isShutdown():測試是否該ExecutorService已被關閉;
  • awaitTermination(long,TimeUnit):接收timeout和TimeUnit兩個參數,用於設定超時時間及單位。當等待超過設定時間時,會監測ExecutorService是否已經關閉,若關閉則返回true,不然返回false。通常狀況下會和shutdown方法組合使用;
  • invokeAll :做用是等待全部的任務執行完成後統一返回;
  • invokeAny :將第一個獲得的結果做爲返回值,而後馬上終止全部的線程。若是設置了超時時間,未超時完成則正常返回結果,若是超時未完成則報超時異常。

AbstractExcutorService抽象類

此類的定義並無特殊的意義僅僅是實現了ExecutorService接口

public abstract class AbstractExecutorService implements ExecutorService {
    //此方法很簡單就是對runnable保證,將其包裝爲一個FutureTask
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }
    //包裝callable爲FutureTask
    //FutureTask其實就是對Callable的一個封裝
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }
    //提交一個Runnable類型的任務
    public Future<?> submit(Runnable task) {
        //若是爲null則拋出NPE
        if (task == null) throw new NullPointerException();
        //包裝任務爲一個Future
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        //將任務丟給執行器,而此處會拋出拒絕異常,在講述ThreadPoolExecutor的時候有講述,不記得的讀者能夠去再看看
        execute(ftask);
        return ftask;
    }

    //與上方方法相同只不過指定了返回結果
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }
    //與上方方法相同只是換成了callable
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

    //執行集合tasks結果是最後一個執行結束的任務結果
    //能夠設置超時 timed爲true而且nanos是將來的一個時間
    //任何一個任務完成都將會返回結果
    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos) throws InterruptedException, ExecutionException, TimeoutException {
        //傳入的任務集合不能爲null
        if (tasks == null)
            throw new NullPointerException();
        //傳入的任務數不能是0
        int ntasks = tasks.size();
        if (ntasks == 0)
            throw new IllegalArgumentException();
        //知足上面的校驗後將任務分裝到一個ArrayList中
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
        //而且建立一個執行器傳入this
        //這裏簡單講述他的執行原理,傳入this會使用傳入的this(類型爲Executor)做爲執行器用於執行任務,當submit提交任務的時候回將任務
        //封裝爲一個內部的Future而且重寫他的done而此方法就是在future完成的時候調用的,而他的寫法則是將當前完成的future添加到esc
        //維護的結果隊列中
        ExecutorCompletionService<T> ecs =
            new ExecutorCompletionService<T>(this);

        try {
            //建立一個執行異常,以便後面拋出
            ExecutionException ee = null;
            //若是開啓了超時則計算死線時間若是時間是0則表明沒有開啓執行超時
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            //獲取任務的迭代器
            Iterator<? extends Callable<T>> it = tasks.iterator();
            //先獲取迭代器中的第一個任務提交給前面建立的ecs執行器
            futures.add(ecs.submit(it.next()));
            //前面記錄的任務數減一
            --ntasks;
            //當前激活數爲1
            int active = 1;
            //進入死循環
            for (;;) {
                //獲取剛纔提價的任務是否完成若是完成則f不是null不然爲null
                Future<T> f = ecs.poll();
                //若是爲null則表明任務還在繼續
                if (f == null) {
                    //若是當前任務大於0 說明除了剛纔的任務還有別的任務存在
                    if (ntasks > 0) {
                        //則任務數減一
                        --ntasks;
                        //而且再次提交新的任務
                        futures.add(ecs.submit(it.next()));
                        //當前的存活的執行任務加一
                        ++active;
                    }
                    //若是當前存活任務數是0則表明沒有任務在執行了從而跳出循環
                    else if (active == 0)
                        break;
                    //若是當前任務執行設置了超時時間
                    else if (timed) {
                        //則設置指定的超時時間獲取
                        f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                        //等待執行超時尚未獲取到則拋出超時異常
                        if (f == null)
                            throw new TimeoutException();
                        //不然使用當前時間計算剩下的超時時間用於下一個循環使用
                        nanos = deadline - System.nanoTime();
                    }
                    //若是沒有設置超時則直接獲取任務
                    else
                        f = ecs.take();
                }
                //若是獲取到了任務結果f!=null
                if (f != null) {
                    //激活數減一
                    --active;
                    try {
                        //返回獲取到的結果
                        return f.get();
                        //若是獲取結果出錯則包裝異常
                    } catch (ExecutionException eex) {
                        ee = eex;
                    } catch (RuntimeException rex) {
                        ee = new ExecutionException(rex);
                    }
                }
            }
            //若是異常不是null則拋出若是是則建立一個
            if (ee == null)
                ee = new ExecutionException();
            throw ee;

        } finally {
            //其餘任務則設置取消
            for (int i = 0, size = futures.size(); i < size; i++)
                futures.get(i).cancel(true);
        }
    }
    //對上方方法的封裝
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
        try {
            return doInvokeAny(tasks, false, 0);
        } catch (TimeoutException cannotHappen) {
            assert false;
            return null;
        }
    }
    //對上方法的封裝
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return doInvokeAny(tasks, true, unit.toNanos(timeout));
    }
    //相對於上一個方法執行成功任何一個則返回結果而此方法是所有執行完而後統一返回結果
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {
        //傳入的任務集合不能是null
        if (tasks == null)
            throw new NullPointerException();
        //建立一個集合用來保存獲取到的執行future
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        //任務是否執行完成
        boolean done = false;
        try {
            //遍歷傳入的任務而且調用執行方法將建立的future添加到集合中
            for (Callable<T> t : tasks) {
                RunnableFuture<T> f = newTaskFor(t);
                futures.add(f);
                execute(f);
            }
            //遍歷獲取到的future
            for (int i = 0, size = futures.size(); i < size; i++) {
                Future<T> f = futures.get(i);
                //若是當前任務沒有成功則進行f.get方法等待此方法執行成功,若是方法執行異常或者被取消將忽略異常
                if (!f.isDone()) {
                    try {
                        f.get();
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    }
                }
            }
            //到這一步則表明全部的任務都已經有了確切的結果
            done = true;
            //返回任務結果集合
            return futures;
        } finally {
            //若是不是true是false 則表明執行過程當中被中斷了則須要對任務進行取消操做,若是正常完成則不會被取消
            if (!done)
                for (int i = 0, size = futures.size(); i < size; i++)
                    futures.get(i).cancel(true);
        }
    }
    //與上方方法的區別在於對於任務集合能夠設置超時時間
    //這裏會針對差別進行講解
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                         long timeout, TimeUnit unit)
        throws InterruptedException {
        if (tasks == null)
            throw new NullPointerException();
        //計算設置時長的納秒時間
        long nanos = unit.toNanos(timeout);
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;
        try {
            for (Callable<T> t : tasks)
                futures.add(newTaskFor(t));
            //計算最終計算的確切時間點,運行時長不能超過此時間也就是時間死線
            //這裏是個細節future建立的時間並無算做執行時間
            final long deadline = System.nanoTime() + nanos;
            //獲取當前結果數
            final int size = futures.size();
            //遍歷將任務進行執行
            for (int i = 0; i < size; i++) {
                execute((Runnable)futures.get(i));
                //而且每次都計算死線
                nanos = deadline - System.nanoTime();
                //若是時間已經超過則返回結果
                if (nanos <= 0L)
                    return futures;
            }
            //不然遍歷future肯定每次執行都獲取到告終果
            for (int i = 0; i < size; i++) {
                Future<T> f = futures.get(i);
                if (!f.isDone()) {
                    //若是在等待過程當中已經超時則返回當前等待結合
                    if (nanos <= 0L)
                        return futures;
                    try {
                        //若是沒有超過死線則設置從future中獲取結果的時間若是超過則會派出timeout
                        f.get(nanos, TimeUnit.NANOSECONDS);
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    } catch (TimeoutException toe) {
                        //拋出了異常則會返回當前的列表
                        return futures;
                    }
                    //計算最新的超時時間
                    nanos = deadline - System.nanoTime();
                }
            }
            //以前的返回都沒有設置爲true因此在finally中都會設置爲取消惟獨正常執行完成到此處返回的結果纔是最終的結果
            done = true;
            return futures;
        } finally {
            if (!done)
                for (int i = 0, size = futures.size(); i < size; i++)
                    futures.get(i).cancel(true);
        }
    }

}
複製代碼

線程池的具體實現

  • ThreadPoolExecutor 默認線程池
  • ScheduledThreadPoolExecutor 定時線程池 (下篇再作介紹)

ThreadPoolExecutor

線程池重點屬性

//用來標記線程池狀態(高3位),線程個數(低29位)
//默認是RUNNING狀態,線程個數爲0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

//線程個數掩碼位數,並非全部平臺int類型是32位,因此準確說是具體平臺下Integer的二進制位數-3後的剩餘位數纔是線程的個數,
private static final int COUNT_BITS = Integer.SIZE - 3;

//線程最大個數(低29位)000 11111111111111111111111111111
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
複製代碼

ctl 是對線程池的運行狀態和線程池中有效線程的數量進行控制的一個字段, 它包含兩部分的信息: 線程池的運行狀態 (runState) 和線程池內有效線程的數量 (workerCount),這裏能夠看到,使用了Integer類型來保存,高3位保存runState,低29位保存workerCount。COUNT_BITS 就是29,CAPACITY就是1左移29位減1(29個1),這個常量表示workerCount的上限值,大約是5億。

ctl相關方法

  • runStateOf:獲取運行狀態;
  • workerCountOf:獲取活動線程數;
  • ctlOf:獲取運行狀態和活動線程數的值。
// 獲取高三位 運行狀態
private static int runStateOf(int c) { return c & ~CAPACITY; }

//獲取低29位 線程個數
private static int workerCountOf(int c) { return c & CAPACITY; }

//計算ctl新值,線程狀態 與 線程個數
private static int ctlOf(int rs, int wc) { return rs | wc; }
複製代碼

線程池存在5種狀態

//運行中 111 00000000000000000000000000000
private static final int RUNNING    = -1 << COUNT_BITS;
//關閉 000 00000000000000000000000000000
private static final int SHUTDOWN   =  0 << COUNT_BITS;
//中止 001 00000000000000000000000000000
private static final int STOP       =  1 << COUNT_BITS;
//整理 010 00000000000000000000000000000
private static final int TIDYING    =  2 << COUNT_BITS;
//終止 011 00000000000000000000000000000
private static final int TERMINATED =  3 << COUNT_BITS;
複製代碼

使用一個整形,前3位表示狀態,後29位表示線程容量,也就是說線程最多有 2 30 1 2^{30}−1

前三位 狀態 十進制
111 RUNNING -1
000 SHUTDOWN 0
001 STOP 1
010 TIDYING 2
011 TERMINATED 3

也能夠看出當ctl小於零表示線程池仍在運行

RUNNING

  • 狀態說明:線程池處在RUNNING狀態時,可以接收新任務,以及對已添加的任務進行處理。
  • 狀態切換:線程池的初始化狀態是RUNNING。換句話說,線程池被一旦被建立,就處於RUNNING狀態,而且線程池中的任務數爲0!

SHUTDOWN

  • 狀態說明:線程池處在SHUTDOWN狀態時,不接收新任務,但能處理已添加的任務。
  • 狀態切換:調用線程池的shutdown()接口時,線程池由RUNNING -> SHUTDOWN。

STOP

  • 狀態說明:線程池處在STOP狀態時,不接收新任務,不處理已添加的任務,而且會中斷正在處理的任務。
  • 狀態切換:調用線程池的shutdownNow()接口時,線程池由(RUNNING or SHUTDOWN ) -> STOP。

TIDYING

  • 狀態說明:當全部的任務已終止,ctl記錄的」任務數量」爲0,線程池會變爲TIDYING狀態。當線程池變爲TIDYING狀態時,會執行鉤子函數terminated()。terminated()在ThreadPoolExecutor類中是空的,若用戶想在線程池變爲TIDYING時,進行相應的處理;能夠經過重載terminated()函數來實現。
  • 狀態切換:當線程池在SHUTDOWN狀態下,阻塞隊列爲空而且線程池中執行的任務也爲空時,就會由 SHUTDOWN -> TIDYING。 當線程池在STOP狀態下,線程池中執行的任務爲空時,就會由STOP -> TIDYING。

TERMINATED

  • 狀態說明:線程池完全終止,就變成TERMINATED狀態。
  • 狀態切換:線程池處在TIDYING狀態時,執行完terminated()以後,就會由 TIDYING -> TERMINATED。

進入TERMINATED的條件以下:

  • 線程池不是RUNNING狀態;
  • 線程池狀態不是TIDYING狀態或TERMINATED狀態;
  • 若是線程池狀態是SHUTDOWN而且workerQueue爲空;
  • workerCount爲0;
  • 設置TIDYING狀態成功。

線程池參數

corePoolSize

線程池中的核心線程數,當提交一個任務時,線程池建立一個新線程執行任務,直到當前線程數等於corePoolSize;若是當前線程數爲corePoolSize,繼續提交的任務被保存到阻塞隊列中,等待被執行;若是執行了線程池的prestartAllCoreThreads()方法,線程池會提早建立並啓動全部核心線程。

maximumPoolSize

線程池中容許的最大線程數。若是當前阻塞隊列滿了,且繼續提交任務,則建立新的線程執行任務,前提是當前線程數小於maximumPoolSize;

keepAliveTim

線程池維護線程所容許的空閒時間。當線程池中的線程數量大於corePoolSize的時候,若是這時沒有新的任務提交,核心線程外的線程不會當即銷燬,而是會等待,直到等待的時間超過了keepAliveTime;

unit

keepAliveTime的單位;

workQueue

用來保存等待被執行的任務的阻塞隊列,且任務必須實現Runable接口,在JDK中提供了以下阻塞隊列:

  • 一、ArrayBlockingQueue:基於數組結構的有界阻塞隊列,按FIFO排序任務;
  • 二、LinkedBlockingQuene:基於鏈表結構的阻塞隊列,按FIFO排序任務,吞吐量一般要高於ArrayBlockingQuene;
  • 三、SynchronousQuene:一個不存儲元素的阻塞隊列,每一個插入操做必須等到另外一個線程調用移除操做,不然插入操做一直處於阻塞狀態,吞吐量一般要高於LinkedBlockingQuene;
  • 四、priorityBlockingQuene:具備優先級的無界阻塞隊列;

threadFactory

它是ThreadFactory類型的變量,用來建立新線程。默認使用Executors.defaultThreadFactory() 來建立線程。使用默認的ThreadFactory來建立線程時,會使新建立的線程具備相同的NORM_PRIORITY優先級而且是非守護線程,同時也設置了線程的名稱。

handler

線程池的飽和策略,當阻塞隊列滿了,且沒有空閒的工做線程,若是繼續提交任務,必須採起一種策略處理該任務,線程池提供了4種策略:

  1. AbortPolicy:直接拋出異常,默認策略;
  2. CallerRunsPolicy:用調用者所在的線程來執行任務;
  3. DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務,並執行當前任務;
  4. DiscardPolicy:直接丟棄任務;

上面的4種策略都是ThreadPoolExecutor的內部類。 固然也能夠根據應用場景實現RejectedExecutionHandler接口,自定義飽和策略,如記錄日誌或持久化存儲不能處理的任務。

線程池的建立

有四個構造函數,其餘三個都是調用下面代碼中的這個構造函數

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) 複製代碼

線程池監控

public long getTaskCount() //線程池已執行與未執行的任務總數 public long getCompletedTaskCount() //已完成的任務數 public int getPoolSize() //線程池當前的線程數 public int getActiveCount() //線程池中正在執行任務的線程數量 複製代碼

線程池原理

核心方法分析

execute方法

execute 方法是提交任務 command 到線程池進行執行

public void execute(Runnable command) {
    //若是任務爲null,則拋出NPE異常
    if (command == null)
        throw new NullPointerException();
    /* * clt記錄着runState和workerCount */
    int c = ctl.get();
    /* * workerCountOf方法取出低29位的值,表示當前活動的線程數; * 若是當前活動線程數小於corePoolSize,則新建一個線程放入線程池中; * 並把任務添加到該線程中。 */
    if (workerCountOf(c) < corePoolSize) {
        /* * addWorker中的第二個參數表示限制添加線程的數量是根據corePoolSize來判斷仍是maximumPoolSize來判斷; * 若是爲true,根據corePoolSize來判斷; * 若是爲false,則根據maximumPoolSize來判斷 */
        if (addWorker(command, true))
            return;
        /* * 若是添加失敗,則從新獲取ctl值 */
        c = ctl.get();
    }
    /* * 若是當前線程池是運行狀態而且任務添加到隊列成功 */
    if (isRunning(c) && workQueue.offer(command)) {
		// 從新獲取ctl值
        int recheck = ctl.get();
        // 再次判斷線程池的運行狀態,若是不是運行狀態,因爲以前已經把command添加到workQueue中了,
        // 這時須要移除該command
        // 執行事後經過handler使用拒絕策略對該任務進行處理,整個方法返回
        if (! isRunning(recheck) && remove(command))
            reject(command);
        /* * 獲取線程池中的有效線程數,若是數量是0,則執行addWorker方法 * 這裏傳入的參數表示: * 1. 第一個參數爲null,表示在線程池中建立一個線程,但不去啓動; * 2. 第二個參數爲false,將線程池的有限線程數量的上限設置爲maximumPoolSize,添加線程時根據maximumPoolSize來判斷; * 若是判斷workerCount大於0,則直接返回,在workQueue中新增的command會在未來的某個時刻被執行。 */
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    /* * 若是執行到這裏,有兩種狀況: * 1. 線程池已經不是RUNNING狀態; * 2. 線程池是RUNNING狀態,但workerCount >= corePoolSize而且workQueue已滿。 * 這時,再次調用addWorker方法,但第二個參數傳入爲false,將線程池的有限線程數量的上限設置爲maximumPoolSize; * 若是失敗則拒絕該任務 */
    else if (!addWorker(command, false))
        reject(command);
}
複製代碼

簡單來講,在執行execute()方法時若是狀態一直是RUNNING時的執行過程以下:

  1. 若是 workerCount < corePoolSize,則建立並啓動一個線程來執行新提交的任務;
  2. 若是 workerCount > corePoolSize,且線程池內的阻塞隊列未滿,則將任務添加到阻塞隊列中;
  3. 若是 workerCount >= corePoolSize && workerCount < maximumPoolSize,且線程池內的阻塞隊列已滿,則建立並啓動一個線程來執行新提交的任務;
  4. 若是 workerCount >= maximumPoolSize,而且線程池內的阻塞隊列已滿,則根據拒絕策略來處理該任務,默認的處理方式是直接拋異常。

注意:addWorker(null,false); ,也就是建立一個線程,但並無傳入從任務,由於任務已經被添加到 workQueue中了,因此 worker 在執行的時候,會直接從 workQueue 中獲取任務。因此,在workerCountof(recheck) == 0 時執行 addWorker(null,false); 也是爲了保證線程池在RUNNING狀態下必需要有一個線程來執行任務。

execute方法執行流程以下

addWorker方法

addWorker方法的主要工做是在線程池中建立一個新的線程並執行:

  • firstTask參數 用於指定新增的線程執行的第一個任務;
  • core參數 爲true表示在新增線程時會判斷當前活動線程數是否少於corePoolSize,false表示新增線程前須要判斷當前活動線程數是否少於maximumPoolSize。

代碼以下:

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        //獲取ctl
        int c = ctl.get();
    	// 獲取運行狀態
        int rs = runStateOf(c);
        /* * 這個if判斷 * 若是rs >= SHUTDOWN,則表示此時再也不接收新任務; * 接着判斷如下3個條件,只要有1個不知足,則返回false: * 1. rs == SHUTDOWN,這時表示關閉狀態,再也不接受新提交的任務,但卻能夠繼續處理阻塞隊列中已保存的任務 * 2. firsTask爲空 * 3. 阻塞隊列不爲空 * * 首先考慮rs == SHUTDOWN的狀況 * 這種狀況下不會接受新提交的任務,因此在firstTask不爲空的時候會返回false; * 而後,若是firstTask爲空,而且workQueue也爲空,則返回false, * 由於隊列中已經沒有任務了,不須要再添加線程了 */
     	// Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                        firstTask == null &&
                        ! workQueue.isEmpty()))
            return false;
        for (;;) {
            // 獲取線程數
            int wc = workerCountOf(c);
            // 若是wc超過CAPACITY,也就是ctl的低29位的最大值(二進制是29個1),返回false;
            // 這裏的core是addWorker方法的第二個參數,若是爲true表示根據corePoolSize來比較,
            // 若是爲false則根據maximumPoolSize來比較。
            if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 嘗試增長workerCount,若是成功,則跳出第一個for循環
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // 若是增長workerCount失敗,則從新獲取ctl的值
            c = ctl.get();  // Re-read ctl
            // 若是當前的運行狀態不等於rs,說明狀態已被改變,返回第一個for循環繼續執行
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
     	// 根據firstTask來建立Worker對象
        w = new Worker(firstTask);
     	// 每個Worker對象都會建立一個線程
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int rs = runStateOf(ctl.get());
                // rs < SHUTDOWN表示是RUNNING狀態;
                // 若是rs是RUNNING狀態或者rs是SHUTDOWN狀態而且firstTask爲null,向線程池中添加線程。
                // 由於在SHUTDOWN時不會在添加新的任務,但仍是會執行workQueue中的任務
                if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // workers是一個HashSet
                    workers.add(w);
                    int s = workers.size();
                    // largestPoolSize記錄着線程池中出現過的最大線程數量
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 啓動線程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
複製代碼

Worker類

線程池中的每個線程被封裝成一個 Worker 對象,ThreadPool 維護的其實就是一組 Worker 對象。

/** * Woker主要維護着運行task的worker的中斷控制信息,以及其餘小記錄。這個類繼承AbstractQueuedSynchronizer * 而來簡化獲取和釋放每個任務執行中的鎖。這能夠防止中斷那些打算喚醒正在等待其餘線程任務的任務,而不是 * 中斷正在運行的任務。咱們實現一個簡單的不可重入鎖而不是ReentrantLock,由於咱們不想當其調用setCorePoolSize * 這樣的方法的時候能得到鎖。 */
//worker主要是對進行中的任務進行中斷控制,順帶着對其餘行爲進行記錄
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
        private static final long serialVersionUID = 6138294804551838833L;

        //正在跑的線程,若是是null標識factory失敗
        final Thread thread;
        //初始化一個任務以運行
        Runnable firstTask;
        //每一個線程計數
        volatile long completedTasks;

        /** * 用給定的firstTask和從threadFactory建立 */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        //主要調用了runWorker
        public void run() {
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        //嘗試獲取鎖
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        //嘗試釋放鎖
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock() { acquire(1); }
        public boolean tryLock() { return tryAcquire(1); }
        public void unlock() { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }
複製代碼

Worker類繼承了AQS,並實現了Runnable接口,注意其中的 firstTask 和 thread 屬性:

  • firstTask用它來保存傳入的任務;
  • thread是在調用構造方法時經過 ThreadFactory 來建立的線程,是用來處理任務的線程。

在調用構造方法時,須要把任務傳入,這裏經過 getThreadFactory().newThread(this); 來新建一個線程,newThread 方法傳入的參數是 this,由於 Worker 自己繼承了 Runnable 接口,也就是一個線程,因此一個 Worker 對象在啓動的時候會調用 Worker 類中的 run 方法。

Worker 繼承了 AQS,使用AQS來實現獨佔鎖的功能。爲何不使用 ReentrantLock 來實現呢?能夠看到 tryAcquire 方法,它是不容許重入的,而 ReentrantLock 是容許重入的:

  1. lock方法一旦獲取了獨佔鎖,表示當前線程正在執行任務中;
  2. 若是正在執行任務,則不該該中斷線程;
  3. 若是該線程如今不是獨佔鎖的狀態,也就是空閒的狀態,說明它沒有在處理任務,這時能夠對該線程進行中斷;
  4. 線程池在執行 shutdown 方法或 tryTerminate 方法時會調用 interruptIdleWorkers 方法來中斷空閒的線程,interruptIdleWorkers 方法會使用 tryLock 方法來判斷線程池中的線程是不是空閒狀態;
  5. 之因此設置爲不可重入,是由於咱們不但願任務在調用像 setCorePoolSize 這樣的線程池控制方法時從新獲取鎖。若是使用ReentrantLock,它是可重入的,這樣若是在任務中調用瞭如 setCorePoolSize 這類線程池控制的方法,會中斷正在運行的線程。

因此,Worker繼承自AQS,用於判斷線程是否空閒以及是否能夠被中斷。

Worker(Runnable firstTask) {
	setState(-1); // inhibit interrupts until runWorker
	this.firstTask = firstTask;
	this.thread = getThreadFactory().newThread(this);
}
複製代碼

此外,在構造方法中執行了 setState(-1); ,把 state 變量設置爲 -1,爲何這樣作呢?是由於AQS中默認的 state 是0,若是剛剛建立了一個 Worker 對象,尚未執行任務時,這時就不該該被中斷,看一下 tryAquire 方法:

protected boolean tryAcquire(int unused) {
	//cas修改state,不可重入
    if (compareAndSetState(0, 1)) { 
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
    }
    return false;
}
複製代碼

tryAcquire方法是根據state是不是0來判斷的,因此,setState(-1);將state設置爲-1是爲了禁止在執行任務前對線程進行中斷。

既然是AQS,其對於state的解釋也很關鍵

state 含義
-1 初始化狀態,禁止中斷
0 解鎖狀態
1 上鎖狀態(獨佔)

正由於如此,在runWorker方法中會先調用Worker對象的unlock方法將state設置爲0。

runWorker方法

在Worker類中的run方法調用了runWorker方法來執行任務,runWorker方法的代碼以下:

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    // 獲取第一個任務
    Runnable task = w.firstTask;
    w.firstTask = null;
    // 解鎖,容許interrupt操做
    w.unlock(); // allow interrupts
    // 是否由於異常退出循環
    boolean completedAbruptly = true;
    try {
        // 若是task爲空,則經過getTask來獲取任務
        while (task != null || (task = getTask()) != null) {
            // 這兒對worker進行加鎖,是爲了達到下面的目的
            // 1. 下降鎖範圍,提高性能
            // 2. 保證每一個worker執行的任務是串行的
            w.lock();
            // 若是線程池處於STOP狀態就中斷線程
            // 若是線程被中斷(清除中斷標記),線程池處於STOP狀態,線程沒有再被中斷
            if ((runStateAtLeast(ctl.get(), STOP) || //至少是STOP狀態
                    (Thread.interrupted() && //中斷過(並抹除中斷標記)
                            runStateAtLeast(ctl.get(), STOP))) && //再次檢查
                    !wt.isInterrupted()) //wt沒有被中斷
                wt.interrupt();
            try {
                beforeExecute(wt, task); //鉤子方法
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown); //鉤子方法
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}
複製代碼

這裏說明一下第一個 if判斷 ,目的是:

  • 若是線程池正在中止,那麼要保證當前線程是中斷狀態;
  • 若是不是的話,則要保證當前線程不是中斷狀態;

這裏要考慮在執行該 if語句 期間可能也執行了 shutdownNow 方法,shutdownNow 方法會把狀態設置爲 STOP ,回顧一下 STOP 狀態:不能接受新任務,也不處理隊列中的任務,會中斷正在處理任務的線程。在線程池處於 RUNNING 或 SHUTDOWN 狀態時,調用 shutdownNow() 方法會使線程池進入該狀態。

總結一下runWorker方法的執行過程:

  1. while循環不斷地經過getTask()方法獲取任務;
  2. getTask()方法從阻塞隊列中取任務;
  3. 若是線程池正在中止,那麼要保證當前線程是中斷狀態,不然要保證當前線程不是中斷狀態;
  4. 調用task.run()執行任務;
  5. 若是task爲null則跳出循環,執行processWorkerExit()方法;
  6. runWorker方法執行完畢,也表明着Worker中的run方法執行完畢,銷燬線程。

這裏的beforeExecute方法和afterExecute方法在ThreadPoolExecutor類中是空的,留給子類來實現。 completedAbruptly 變量來表示執行任務過程當中是否出現了異常,在processWorkerExit方法中會對該變量的值進行判斷。

getTask方法

getTask方法用來從阻塞隊列中取任務,代碼以下:

private Runnable getTask() {
    // timeOut變量的值表示上次從阻塞隊列中取任務時是否超時
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // Check if queue empty only if necessary.
        /* * 若是線程池狀態rs >= SHUTDOWN,也就是非RUNNING狀態,再進行如下判斷: * 1. rs >= STOP,線程池是否正在stop; * 2. 阻塞隊列是否爲空。 * 若是以上條件知足,則將workerCount減1並返回null。 * 由於若是當前線程池狀態的值是SHUTDOWN或以上時,不容許再向阻塞隊列中添加任務。 */
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        int wc = workerCountOf(c);
        // Are workers subject to culling?
        // timed變量用於判斷是否須要進行超時控制。
        // allowCoreThreadTimeOut默認是false,也就是核心線程不容許進行超時;
        // wc > corePoolSize,表示當前線程池中的線程數量大於核心線程數量;
        // 對於超過核心線程數量的這些線程,須要進行超時控制
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        /* * wc > maximumPoolSize的狀況是由於可能在此方法執行階段同時執行了setMaximumPoolSize方法; * timed && timedOut 若是爲true,表示當前操做須要進行超時控制,而且上次從阻塞隊列中獲取任務發生了超時 * 接下來判斷,若是有效線程數量大於1,或者阻塞隊列是空的,那麼嘗試將workerCount減1; * 若是減1失敗,則返回重試。 * 若是wc == 1時,也就說明當前線程是線程池中惟一的一個線程了。 */
        if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
            /* * 根據timed來判斷,若是爲true,則經過阻塞隊列的poll方法進行超時控制,若是在keepAliveTime時間內沒有獲取到任務,則返回null; * 不然經過take方法,若是這時隊列爲空,則take方法會阻塞直到隊列不爲空。 * */
            Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
            if (r != null)
                return r;
            // 若是 r == null,說明已經超時,timedOut設置爲true
            timedOut = true;
        } catch (InterruptedException retry) {
            // 若是獲取任務時當前線程發生了中斷,則設置timedOut爲false並返回循環重試
            timedOut = false;
        }
    }
}
複製代碼

這裏重要的地方是第二個 if判斷 ,目的是控制線程池的有效線程數量。由上文中的分析能夠知道,在執行 execute 方法時,若是當前線程池的線程數量超過了 corePoolSize 且小於 maximumPoolSize,而且workQueue已滿時,則能夠添加工做線程,但這時若是超時沒有獲取到任務,也就是 timedOut 爲 true的狀況,說明 workQueue 已經爲空了,也就說明了當前線程池中不須要那麼多線程來執行任務了,能夠把多餘 corePoolSize 數量的線程銷燬掉,保持線程數量在 corePoolSize 便可。

何時會銷燬? 固然是runWorker方法執行完以後,也就是Worker中的run方法執行完,由JVM自動回收。

getTask方法返回null時,在runWorker方法中會跳出while循環,而後會執行 processWorkerExit方法。

processWorkerExit方法

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // 若是completedAbruptly值爲true,則說明線程執行時出現了異常,須要將workerCount減1;
    // 若是線程執行時沒有出現異常,說明在getTask()方法中已經已經對workerCount進行了減1操做,這裏就沒必要再減了。 
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //統計完成的任務數
        completedTaskCount += w.completedTasks;
        // 從workers中移除,也就表示着從線程池中移除了一個工做線程
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
    // 根據線程池狀態進行判斷是否結束線程池
    tryTerminate();
    int c = ctl.get();
    /* * 當線程池是RUNNING或SHUTDOWN狀態時,若是worker是異常結束,那麼會直接addWorker; * 若是allowCoreThreadTimeOut=true,而且等待隊列有任務,至少保留一個worker; * 若是allowCoreThreadTimeOut=false,workerCount很多於corePoolSize。 */
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}
複製代碼

至此,processWorkerExit執行完成以後,工做線程被銷燬,以上就是整個工做線程的生命週期,從 execute 方法開始,Worker 使用 ThreadFactory 建立新的工做線程,runWorker 經過 getTask 獲取任務,而後執行任務,若是 getTask 返回null,進入 oricessWorkerExit 方法,整個線程結束。以下圖所示:

shutdown方法

調用關係

shutdown

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();//權限檢查
        advanceRunState(SHUTDOWN);//設置當前線程池狀態爲SHUTDOWN
        interruptIdleWorkers();//中斷空閒線程
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}
複製代碼

interruptIdleWorkers

中斷空閒線程

private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();//加鎖
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) {//未被中斷且正在獨佔運行
                try {
                    t.interrupt();//中斷
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();//worker解鎖
                }
            }
            if (onlyOne)//若是隻中斷一個
                break;
        }
    } finally {
        mainLock.unlock();//解鎖
    }
}
複製代碼

tryTerminate

final void tryTerminate() {
	for (;;) { // 無限循環,確保操做成功
    	// 獲取線程池控制狀態
        int c = ctl.get();
        if (isRunning(c) || // 線程池的運行狀態爲RUNNING
        	runStateAtLeast(c, TIDYING) || // 線程池的運行狀態最小要大於TIDYING
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))  // 線程池的運行狀態爲SHUTDOWN而且workQueue隊列不爲null
            // 不能終止,直接返回
            return;
        if (workerCountOf(c) != 0) { // 線程池正在運行的worker數量不爲0 // Eligible to terminate
        	// 僅僅中斷一個空閒的worker
            interruptIdleWorkers(ONLY_ONE);
            return;
        }
        // 獲取線程池的鎖
        final ReentrantLock mainLock = this.mainLock;
        // 獲取鎖
        mainLock.lock();
        try {
        	if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { // 比較並設置線程池控制狀態爲TIDYING
            	try {
                	// 終止,鉤子函數
                    terminated();
                } finally {
                	// 設置線程池控制狀態爲TERMINATED
                    ctl.set(ctlOf(TERMINATED, 0));
                    // 釋放在termination條件上等待的全部線程
                    termination.signalAll();
                }
                return;
            }
        } finally {
        	// 釋放鎖
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}
複製代碼

shutdownNow方法

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(STOP);//設置線程池狀態爲STOP
        interruptWorkers();//中斷線程
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}
複製代碼

Executors類

Executors類,提供了一系列工廠方法用於建立線程池,返回的線程池都實現了ExecutorService接口。

關於Callable的支持

  • Callable callable(Runnable task)

    返回 Callable 對象,調用它時可運行給定的任務。

    public static Callable<Object> callable(Runnable task) {
    	if (task == null)
    		throw new NullPointerException();
    	return new RunnableAdapter<Object>(task, null);
    }
    複製代碼
    • Callable callable(Runnable task, T result)

    返回 Callable 對象,調用它時可運行給定的任務並返回給定的結果。callable(task)等價於callable(task, null)。

    public static <T> Callable<T> callable(Runnable task, T result) {
    	if (task == null)
    		throw new NullPointerException();
    	return new RunnableAdapter<T>(task, result);
    }
    複製代碼

    RunnableAdapter類

    /** * A callable that runs given task and returns given result */
    static final class RunnableAdapter<T> implements Callable<T> {
    	final Runnable task;
     	final T result;
    	RunnableAdapter(Runnable  task, T result) {
        	this.task = task;
            this.result = result;
        }
        public T call() {
        	task.run();
            return result;
        }
    }
    複製代碼
    • Callable callable(final PrivilegedAction action)

    返回 Callable 對象,調用它時可運行給定特權的操做並返回其結果。

    public static Callable<Object> callable(final PrivilegedAction<?> action) {
    	if (action == null)
        	throw new NullPointerException();
        return new Callable<Object>() {
        	public Object call() { return action.run(); }
        };
    }
    複製代碼
    • callable(PrivilegedExceptionAction action)

    和Callable callable(final PrivilegedAction action)其相似,惟一的區別在於,前者可拋出異常。

    public static Callable<Object> callable(final PrivilegedExceptionAction<?> action) {
    	if (action == null)
        	throw new NullPointerException();
      	return new Callable<Object>() {
        	public Object call() throws Exception { return action.run(); }
        };
    }
    複製代碼

    PrivilegedAction接口

    public interface PrivilegedAction<T> {
        T run();
    }
    複製代碼
    • Callable privilegedCallable(Callable callable)

    返回 Callable 對象,調用它時可在當前的訪問控制上下文中執行給定的 callable 對象。

    public static <T> Callable<T> privilegedCallable(Callable<T> callable) {
    	if (callable == null)
       		throw new NullPointerException();
    	return new PrivilegedCallable<T>(callable);
    }
    複製代碼
    • Callable privilegedCallableUsingCurrentClassLoader(Callable callable)

    返回 Callable 對象,調用它時可在當前的訪問控制上下文中,使用當前上下文類加載器做爲上下文類加載器來執行給定的 callable 對象。

    public static <T> Callable<T> privilegedCallableUsingCurrentClassLoader(Callable<T> callable) {
    	if (callable == null)
    		throw new NullPointerException();
     	return new PrivilegedCallableUsingCurrentClassLoader<T>(callable);
    }
    複製代碼

    關於ThreadFactory的支持

    1. 根據須要建立新線程的對象。使用線程工廠就無需再手工編寫對 new Thread 的調用了,從而容許應用程序使用特殊的線程子類、屬性等等。
    2. Executors對其提供支持:DefaultThreadFactory和PrivilegedThreadFactory。
    public interface ThreadFactory {
        Thread newThread(Runnable r);
    }
    複製代碼

    ThreadFactory爲默認的defaultThreadFactory,ThreadFactory定義了兩個:defaultThreadFactory和privilegedThreadFactory,可是也能夠本身定義,實現ThreadFactory接口或者繼承已經實現的這兩個實現而且修改對應的代碼便可。

    // 返回用於建立新線程的默認線程工廠。
    public static ThreadFactory defaultThreadFactory() {
    	return new DefaultThreadFactory();
    }
    
    // 返回用於建立新線程的線程工廠,這些新線程與當前線程具備相同的權限。
    public static ThreadFactory privilegedThreadFactory() {
    	return new PrivilegedThreadFactory();
    }
    複製代碼
    • DefaultThreadFactory
    static class DefaultThreadFactory implements ThreadFactory {
        static final AtomicInteger poolNumber = new AtomicInteger(1);
        final ThreadGroup group;
        final AtomicInteger threadNumber = new AtomicInteger(1);
        final String namePrefix;
    
        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null)? s.getThreadGroup() :
                                 Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }
    
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }
    複製代碼

    從源碼看出DefaultThreadFactory就是建立一個普通的線程,非守護線程,優先級爲5。 新線程具備可經過 pool-N-thread-M 的 Thread.getName() 來訪問的名稱,其中 N 是此工廠的序列號, M 是此工廠所建立線程的序列號。

    • PrivilegedThreadFactory
    static class PrivilegedThreadFactory extends DefaultThreadFactory {
        private final ClassLoader ccl;
        private final AccessControlContext acc;
    
        PrivilegedThreadFactory() {
            super();
            this.ccl = Thread.currentThread().getContextClassLoader();
            this.acc = AccessController.getContext();
            acc.checkPermission(new RuntimePermission("setContextClassLoader"));
        }
    
        public Thread newThread(final Runnable r) {
            return super.newThread(new Runnable() {
                public void run() {
                    AccessController.doPrivileged(new PrivilegedAction<Object>() {
                        public Object run() {
                            Thread.currentThread().setContextClassLoader(ccl);
                            r.run();
                            return null;
                        }
                    }, acc);
                }
            });
        }
    }
    複製代碼

    從源碼看出,PrivilegedThreadFactory extends DefaultThreadFactory從而具備與 defaultThreadFactory() 相同設置的線程。但增長了兩個特性:ClassLoader和AccessControlContext,從而使運行在此類線程中的任務具備與當前線程相同的訪問控制和類加載器。

    關於RejectedExecutionHandler的支持

    而默認的拒絕策略RejectedExecutionHandler則爲 AbortPolicy,而且全部的拒絕策略都是實現接口RejectedExecutionHandler

    private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
    複製代碼

    五種線程池建立類型

    newFixedThreadPool(固定大小線程池)

    newFixedThreadPool:建立一個核心線程個數和最大線程個數都爲 nThreads 的線程池,而且阻塞隊列長度爲 Integer.MAX_VALUEkeeyAliveTime=0 說明只要線程個數比核心線程個數多而且當前空閒則回收。代碼以下:

    public static ExecutorService newFixedThreadPool(int nThreads) {
    	return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
     }
     //使用自定義線程建立工廠
     public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    	return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>(),
                                          threadFactory);
     }
    複製代碼

    newSingleThreadExecutor(單個後臺線程)

    newSingleThreadExecutor:建立一個核心線程個數和最大線程個數都爲1的線程池,而且阻塞隊列長度爲 Integer.MAX_VALUEkeeyAliveTime=0 說明只要線程個數比核心線程個數多而且當前空閒則回收。代碼以下:

    public static ExecutorService newSingleThreadExecutor() {
    	return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
     }
    
    //使用本身的線程工廠
    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
    	return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>(),
                                        threadFactory));
    }
    複製代碼

    newCachedThreadPool(無界線程池,能夠進行自動線程回收)

    newCachedThreadPool:建立一個按需建立線程的線程池,初始線程個數爲 0,最多線程個數爲 Integer.MAX_VALUE,而且阻塞隊列爲同步隊列,keeyAliveTime=60 說明只要當前線程 60s 內空閒則回收。這個特殊在於加入到同步隊列的任務會被立刻被執行,同步隊列裏面最多隻有一個任務。代碼以下:

    public static ExecutorService newCachedThreadPool() {
    	return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
    }
    
    //使用自定義的線程工廠
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    	return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>(),
                                          threadFactory);
    }
    複製代碼

    newScheduledThreadPool (可調度)

    newScheduledThreadPool:建立一個定長線程池,支持定時及週期性任務執行。newScheduledThreadPool 和 其餘線程池最大的區別是使用的阻塞隊列是 DelayedWorkQueue,並且多了兩個定時執行的方法scheduleAtFixedRate和scheduleWithFixedDelay

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    	return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    
    //使用自定義的線程工廠
    public static ScheduledExecutorService newScheduledThreadPool( int corePoolSize, ThreadFactory threadFactory) {
    	return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }
    複製代碼

    newWorkStealingPool(並行操做)

    newWorkStealingPool:JDK1.8新增newWorkStealingPool,適合使用在很耗時的操做,可是newWorkStealingPool不是ThreadPoolExecutor的擴展,它是新的線程池類ForkJoinPool的擴展,可是都是在統一的一個Executors類中實現,因爲可以合理的使用CPU進行對任務操做(並行操做),因此適合使用在很耗時的任務中。代碼以下:

    public static ExecutorService newWorkStealingPool() {
    	return new ForkJoinPool
                (Runtime.getRuntime().availableProcessors(),
                 ForkJoinPool.defaultForkJoinWorkerThreadFactory,
                 null, true);
    }
    
    public static ExecutorService newWorkStealingPool(int parallelism) {
    	return new ForkJoinPool
                (parallelism,
                 ForkJoinPool.defaultForkJoinWorkerThreadFactory,
                 null, true);
    }
    複製代碼

    相關文章

    PS:以上代碼提交在 Githubgithub.com/Niuh-Study/…

    PS:這裏有一個技術交流羣(扣扣羣:1158819530),方便你們一塊兒交流,持續學習,共同進步,有須要的能夠加一下。

    文章持續更新,能夠公衆號搜一搜「 一角錢技術 」第一時間閱讀, 本文 GitHub org_hejianhui/JavaStudy 已經收錄,歡迎 Star。

相關文章
相關標籤/搜索