jdk1.8 線程池部分源碼分析

首先是老規矩,推薦一下個人企鵝交流羣:java

有興趣交流springboot進行快速開發的同窗能夠加一下下面的企鵝羣。spring

行走的java全棧

普通線程編程

  • 1.實現:繼承Thread或者實現Runnable接口
    • 1.繼承Thread,僅僅只能單繼承
    • 2.實現Runnable接口(可實現內部資源共享),接口能夠多實現
    • 3.經典問題:窗口賣票
  • 2.實例化對象
  • 3.執行任務
  • 4.銷燬線程回收資源
思考:

當多個資源須要開啓線程來處理的時候,咱們怎麼辦?是否一直在重複下面的流程:數組

create -> run -> destroy
複製代碼

咱們知道計算機的每次運行都是須要大量的資源消耗,5個線程的操做可能沒有影響,5w個呢? 五萬次建立和銷燬纔有僅僅五萬次的執行嗎?執行任務可能花費了大量的時間來處理這些建立和銷燬。緩存

線程池安全

特色
  • 1.解決處理器單元內多個線程的執行問題
  • 2.減小處理器單元閒置時間
  • 3.增長了處理器單元工做時間內的吞吐能力(爲何這麼說?咱們減小了多個任務每次線程的建立和銷燬浪費,提升了任務執行效率)
組成
  • 1.線程池管理器(ThreadPool):負責建立、管理、銷燬線程池,以及添加任務
  • 2.工做線程(PoolWorker):無任務則等待,可循環、重複執行任務
  • 3.任務接口(Task):每一個任務必須實現接口,工做線程負責調度任務的執行,規定了任務的入口,以及任務完成後的收尾工做以及任務執行狀態等等
  • 4.任務隊列(TaskQueue):存放沒有處理的任務,提供任務緩衝機制

eg:超市結帳:收營員服務組,單個收營員,收銀工做,等待被收銀的人羣springboot

JDK線程池類:java.util.concurrent.Executors和JDK線程池執行器接口:java.util.concurrent.Executor

在Executors中,jdk提供了一下相關的線程池,以下:bash

靜態方法 建立的線程池類型 返回值的實際實現
newFixedThreadPool(int) 固定線程池 ThreadPoolExecutor
newWorkStealingPool() 處理器核心數的並行線程池 ForkJoinPool
newSingleThreadExecutor() 一個線程的單獨線程池 FinalizableDelegatedExecutorService
newCachedThreadPool() 緩存線程池 ThreadPoolExecutor
newSingleThreadScheduledExecutor() 單獨線程定時線程池 DelegatedScheduledExecutorService
newScheduledThreadPool(int) 定時線程池 ScheduledThreadPoolExecutor

newSingleThreadExecutor() 一個線程的線程池this

爲何這裏我要拿一個線程的線程池來講明呢?其實咱們把簡單的搞定複雜的也是演變過來的。先上碼:spa

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>()));
}

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(),
                    threadFactory));
}
複製代碼

咱們能夠看到上面方法的返回值都是ExecutorService,但實際上實例化的是FinalizableDelegatedExecutorService,咱們進去看看源碼,以下:

static class FinalizableDelegatedExecutorService extends DelegatedExecutorService {
    //構造方法
    FinalizableDelegatedExecutorService(ExecutorService executor) {
        super(executor);
    }
    
    //對象銷燬的時候調用
    protected void finalize() {
        super.shutdown();
    }
}
複製代碼

上面的代碼咱們能夠明顯的看到FinalizableDelegatedExecutorService僅僅是對DelegatedExecutorService的封裝,惟一實現的就是在對象銷燬的時候將ExecutorService結束。

到這裏咱們就應該返回來分析DelegatedExecutorService,以及上面的方法中的具體代碼。

咱們看看默認的單線程線程池的實現,以下:

new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>()));
//此處的代碼實現了一個ExecutorService,分別有幾個參數?何解?

//
public class ThreadPoolExecutor extends AbstractExecutorService {
    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
}
//咱們能夠看到幾個參數的字面意思分別是:
//corePoolSize 核心線程數量,包括空閒線程
//maximumPoolSize 最大線程數量
//keepAliveTime 保持活躍時間(參照後續源碼,這裏應該是:當線程數大於核心時,此爲終止前多餘的空閒線程等待新任務的最長時間)
//unit keepAliveTime 參數的時間單位
//workQueue 執行前用於保持任務的隊列。此隊列僅保持由 execute方法提交的 Runnable任務
//Executors.defaultThreadFactory() 默認線程工廠
//defaultHandler 超出線程和任務隊列的任務的處理程序,實現爲:new AbortPolicy(),固然這裏默認是沒有處理的,須要咱們手動實現

//這裏,咱們接着看默認的線程工廠,畢竟線程池核心是須要線程來執行任務,因此此處先看線程來源。
static class DefaultThreadFactory implements ThreadFactory {
    //池數量,指定原子操做
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    //線程組
    private final ThreadGroup group;
    //線程數量,指定原子操做
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    //線程名稱前綴
    private final String namePrefix;

    DefaultThreadFactory() {
        //獲取系統安全管理器
        SecurityManager s = System.getSecurityManager();
        //建立線程組,由是否獲取系統安全管理器決定
        group = (s != null) ? s.getThreadGroup() :
                              Thread.currentThread().getThreadGroup();
        //構造線程名稱
        namePrefix = "pool-" +
                      poolNumber.getAndIncrement() +
                     "-thread-";
    }
    
    //建立線程
    public Thread newThread(Runnable r) {
        //將線程組、Runnable接口(線程實際執行代碼塊)、線程名、線程所須要的堆棧大小爲0
        Thread t = new Thread(group, r,
                              namePrefix + threadNumber.getAndIncrement(),
                              0);
        //若是爲守護線程,取消守護狀態,必須在線程執行前調用這個setDaemon方法
        if (t.isDaemon())
            t.setDaemon(false);
        //默認任務優先級,值爲5
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}
//上面的默認線程工廠,提供給了咱們一個非守護線程的線程,由原子操做保證線程惟一,任務優先級默認(最低1,最高10,默認5,此處優先級爲5)

複製代碼

看了上面這些咱們能夠總結一下:單線程線程池,默認只有一個線程和一個線程池,等待新任務時間爲0,添加了原子操做來綁定線程。

是否是到這裏就完了? 固然沒有,咱們如今須要看看更加具體的ThreadPoolExecutor,才能更加深刻明白線程池。

public class ThreadPoolExecutor extends AbstractExecutorService {
    /** *全部的構造方法均指向這裏,因此咱們看一下這個就足夠 */
    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        //參數檢查,說明線程池不能線程=0,也不能最大線程數量不大於0切最大線程數量不能少於核心線程數量,等待任務最長時間不能小於0
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        //等待任務隊列、線程工廠、超任務隊列的處理程序
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        //上面的判斷,能夠看作是一種防護式編程,全部的問題預先處理,後續無需考慮相似問題
        //構造線程池相關設定閾值
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
}

//到了這裏其實咱們沒必要先追究具體的實現,仍是先看看AbstractExecutorService吧。

//抽象的執行服務
public abstract class AbstractExecutorService implements ExecutorService {
    
    //執行方法
    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos) throws InterruptedException, ExecutionException, TimeoutException {
        if (tasks == null)
            throw new NullPointerException();
        //獲取任務數量
        int ntasks = tasks.size();
        if (ntasks == 0)
            throw new IllegalArgumentException();
        //任務集合
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
        //執行完成服務
        ExecutorCompletionService<T> ecs =
            new ExecutorCompletionService<T>(this);

        try {
            // 記錄異常
            ExecutionException ee = null;
            
            //超時時間線
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            //使用迭代器獲取任務
            Iterator<? extends Callable<T>> it = tasks.iterator();

            // 肯定開始一項任務
            futures.add(ecs.submit(it.next()));
            //任務數量減小
            --ntasks;
            //正在執行任務標誌
            int active = 1;
            
            //循環執行任務
            for (;;) {
                //獲取任務隊列中第一個任務
                Future<T> f = ecs.poll();
                //任務爲空,若是還有任務則執行任務(任務數量減1,提交任務到執行隊列,正在執行任務數量+1)
                //正在執行任務數爲0,說明任務執行完畢,中斷任務循環
                //如有超時檢查,則執行超時檢查機制
                //上述狀況都不知足,則取出任務隊列頭,並將其從隊列移除
                if (f == null) {
                    if (ntasks > 0) {
                        --ntasks;
                        futures.add(ecs.submit(it.next()));
                        ++active;
                    }
                    else if (active == 0)
                        break;
                    else if (timed) {
                        f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                        if (f == null)
                            throw new TimeoutException();
                        nanos = deadline - System.nanoTime();
                    }
                    else
                        f = ecs.take();
                }
                
                //任務不爲空
                if (f != null) {
                    //正在執行標誌-1
                    --active;
                    try {
                        //返回執行結果
                        return f.get();
                    } catch (ExecutionException eex) {
                        ee = eex;
                    } catch (RuntimeException rex) {
                        ee = new ExecutionException(rex);
                    }
                }
            }

            if (ee == null)
                ee = new ExecutionException();
            throw ee;

        } finally {
            //取消全部任務
            for (int i = 0, size = futures.size(); i < size; i++)
                futures.get(i).cancel(true);
        }
    }
    
    //執行方法
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                         long timeout, TimeUnit unit)
        throws InterruptedException {
        if (tasks == null)
            throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        //和上面相似,這裏也是建立任務隊列
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;
        //迭代進行任務執行
        try {
            //建立任務,並添加到任務隊列
            for (Callable<T> t : tasks)
                futures.add(newTaskFor(t));
            //設置超時時間標記
            final long deadline = System.nanoTime() + nanos;
            final int size = futures.size();

            //在執行器沒有多少多並行性的狀況下,交替執行時間檢查和調用。
            for (int i = 0; i < size; i++) {
                execute((Runnable)futures.get(i));
                nanos = deadline - System.nanoTime();
                //任務超時,返回任務隊列
                if (nanos <= 0L)
                    return futures;
            }
            
            //遍歷任務並返回任務執行結果
            for (int i = 0; i < size; i++) {
                Future<T> f = futures.get(i);
                if (!f.isDone()) {
                    //超時
                    if (nanos <= 0L)
                        return futures;
                    try {
                        //給定執行時間等待任務完成並返回結果
                        f.get(nanos, TimeUnit.NANOSECONDS);
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    } catch (TimeoutException toe) {
                        return futures;
                    }
                    nanos = deadline - System.nanoTime();
                }
            }
            done = true;
            return futures;
        } finally {
            //未完成則取消執行
            if (!done)
                for (int i = 0, size = futures.size(); i < size; i++)
                    futures.get(i).cancel(true);
        }
    }
    
    /** *建立任務隊列 */
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }
    
    /** * 提交任務到執行隊列 */
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
}
複製代碼

經過上面的代碼咱們已經基本瞭解了一個線程池是如何建立任務隊列並執行任務的,因此在這裏咱們只須要關注一些關鍵的ThreadPoolExecutor的方法就能瞭解線程池是如何工做的,而且對應的幾種模式的線程池均可以推導出來。

首先在此次看源碼以前咱們要胡亂思索一番,整理一下線程池的執行大概流程:

線程池執行流程

咱們前面簡單的說過幾個ThreadPoolExecutor的主要參數,咱們下面再仔細總結一下:

public class ThreadPoolExecutor extends AbstractExecutorService {
    
    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
}
複製代碼
  • corePoolSize:該線程池中核心線程數最大值

核心線程:線程池新建線程的時候,若是當前線程總數小於corePoolSize,則新建的是核心線程,若是超過corePoolSize,則新建的線程不是核心線程。核心線程默認狀況下會一直存活在線程池中,即便這個核心線程啥也不幹(閒置狀態)。若是指定ThreadPoolExecutor的allowCoreThreadTimeOut這個屬性爲true,那麼核心線程若是不幹活(閒置狀態)的話,超過必定時間(時長下面參數決定),就會被銷燬掉。

  • maximumPoolSize: 該線程池中線程總數最大值

線程總數 = 核心線程數 + 非核心線程數。

  • keepAliveTime:該線程池中非核心線程閒置超時時長

一個非核心線程,若是不幹活(閒置狀態)的時長超過這個參數所設定的時長,就會被銷燬掉,若是設置allowCoreThreadTimeOut = true,則會做用於核心線程。

  • unit:keepAliveTime的單位

TimeUnit是一個枚舉類型,其包括: NANOSECONDS : 1微毫秒 = 1微秒 / 1000 MICROSECONDS : 1微秒 = 1毫秒 / 1000 MILLISECONDS : 1毫秒 = 1秒 /1000 SECONDS : 秒 MINUTES : 分 HOURS : 小時 DAYS : 天

  • workQueue:線程池中的任務隊列:維護着等待執行的Runnable對象

當全部的核心線程都在幹活時,新添加的任務會被添加到這個隊列中等待處理,若是隊列滿了,則新建非核心線程執行任務。

  • threadFactory:建立線程的方式。

  • handler:異常處理程序。

既然已經知道了任務執行,那麼任務是怎麼排隊的呢?

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /* * 1. 若是運行的線程少於corepoolSize大小,新任務會直接開啓新的線程執行。 * 對addWorker的調用原子性地檢查運行狀態和workerCount,從而經過返回false防止假警報,假警報會在不該該的狀況下添加線程。 * * 2. 若是一個任務成功的加入隊列,咱們須要再次檢查是否須要開啓新的線程來執行。 * 可能緣由有:已有任務執行完畢,或者線程池已經被結束。 * * * 3. 若是不能對任務進行排隊,則嘗試添加一個新任務線程。 * 若是它失敗了,咱們知道咱們已經關閉或飽和了因此拒絕這個任務。 */
         
        //運行狀態標籤
        int c = ctl.get();
        //正在執行的線程數 < 核心線程數 ,則當即執行任務
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //再次檢查是否運行且沒超過任務隊列容量
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //須要執行的任務不在運行狀態且不在等待隊列,則將這個任務異常拋出
            if (! isRunning(recheck) && remove(command))
                reject(command);
            //運行任務數量爲空,則將核心線程外的線程任務置空
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //超過核心線程數且其餘線程任務也添加失敗,拋出異常
        else if (!addWorker(command, false))
            reject(command);
    }

複製代碼

看到這裏咱們已經模模糊糊的明白了任務排隊執行,全部的任務隊列都是同樣的排隊執行,那麼咱們任務隊列又有哪些呢?

  • LinkedBlockingQueue:線性阻塞隊列。接收到任務,若是沒超過corePoolSize,則建立新線程執行,不然進入阻塞隊列等待

  • ArrayBlockingQueue:數組阻塞隊列。數組特診是長度固定,也就是這個隊列長度固定。接收到新任務,若是沒超過corePoolSize,則建立新線程執行,若是超過,則建立新線程(線程總數<maximumPoolSize)執行。若是新任務既不能在隊列中等待,又不能執行,拋出異常。

  • SynchronousQueue:同步隊列。 既然是同步隊列,說明新任務來了就執行。也就是核心線程數量無限大。

  • DelayQueue:延遲隊列,聽名字也知道任務要延遲執行,這個隊列接收到任務時,首先先入隊,只有達到了指定的延時時間,纔會執行任務。

也就是說到了這裏,咱們基本已經分析了線程池的幾個核心:jdk自帶線程池種類、線程池內的線程工廠(用於生產線程)、線程池任務執行、線程池任務排隊、線程池隊列類型。咱們總結一張圖,能夠結束本篇文章,固然其餘類型的線程池具體實現,請自行查看源碼。

jdk線程池結構模型圖

思考:在Java開發中還有哪些相似的東西是這種操做的呢?

相關文章
相關標籤/搜索