Java 多線程:線程池實現原理

前言

文章主要來自:點這裏 。這也是博主的博客,主要分享了本身接觸過的一些後端技術,有不對的地方但願能夠提出。後端

線程池的相關類

咱們都知道,所謂線程池,那麼就是至關於有一個池子,線程就放在這個池子中進行重複利用,可以減去了線程的建立和銷燬所帶來的代價。可是這樣並不能很好的解釋線程池的原理,下面從代碼的角度分析一下線程池的實現。性能

對於原理,在 Java 中,有幾個接口,類 值得咱們關注:this

  • Executor線程

  • ExecutorServicecode

  • AbstractExecutorService對象

  • ThreadPoolExecutor繼承

Executor

public interface Executor {
    void execute(Runnable command);
}

Executor 接口只有一個 方法,execute,而且須要 傳入一個 Runnable 類型的參數。那麼它的做用天然是 具體的執行參數傳入的任務。接口

ExecutorService

public interface ExecutorService extends Executor {

    void shutdown();

    List<Runnable> shutdownNow();

    boolean isShutdown();

    <T> Future<T> submit(Callable<T> task);

    <T> Future<T> submit(Runnable task, T result);

    Future<?> submit(Runnable task);
    
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
    ......
}

ExecutorService 接口繼承了 Executor,而且提供了一些其餘的方法,好比說:隊列

  • shutdownNow : 關閉線程池,返回放入了線程池,可是還沒開始執行的線程。rem

  • submit : 執行的任務 容許擁有返回值。

  • invokeAll : 運行把任務放進集合中,進行批量的執行,而且能有返回值

這三個方法也能夠說是這個接口重點擴展的方法。

Ps:execute 和 submit 區別:

  • submit 有返回值,execute 沒有返回值。 因此說能夠根據任務有無返回值選擇對應的方法。

  • submit 方便異常的處理。 若是任務可能會拋出異常,並且但願外面的調用者可以感知這些異常,那麼就須要調用 submit 方法,經過捕獲 Future.get 拋出的異常。

AbstractExecutorService

AbstractExecutorService 是一個抽象類,主要完成了 對 submit 方法,invokeAll 方法 的實現。 可是其實它的內部仍是調用了 execute 方法,例如:

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

ThreadPoolExecutor

ThreadPoolExecutor 繼承了 AbstractExecutorService,而且實現了最重要的 execute 方法,是咱們主要須要研究的類。另外,整個線程池是如何實現的呢?

在該類中,有兩個成員變量 很是的重要:

private final HashSet<Worker> workers = new HashSet<Worker>();
private final BlockingQueue<Runnable> workQueue;

對於 workers 變量,主要存在了線程對象 Worker,Worker 實現了 Runnable 接口。而對於 workQueue 變量,主要存放了須要執行的任務。 這樣其實能夠猜到, 整個線程池的實現原理應該是 workQueue 中不斷的取出須要執行的任務,放在 workers 中進行處理。

另外,當線程池中的線程用完了以後,多餘的任務會等待,那麼這個等待的過程是 怎麼實現的呢? 其實若是熟悉 BlockingQueue,那麼就會立刻知道,是利用了 BlockingQueue 的take 方法進行處理

下面具體代碼分析:

public void execute(Runnable command) {
        ......
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        ......
    }

首先,這裏須要先理解兩個概念。咱們在建立線程池的時候,一般會指定兩個變量,一個是maximumPoolSize,另一個是 corePoolSize。

  • 對於 maximumPoolSize:指的是 線程池中最多容許有多少個線程。

  • 對於 corePoolSize: 指的是線程池中正在運行的線程。

在 線程池中,有這樣的設定,咱們加入一個任務進行執行,

  • 若是如今線程池中正在運行的線程數量大於 corePoolSize 指定的值而 小於maximumPoolSize 指定的值,那麼就會建立一個線程對該任務進行執行,一旦一個線程被建立運行。

  • 若是線程池中的線程數量大於corePoolSize,那麼這個任務執行完畢後,該線程會被回收;若是 小於corePoolSize,那麼該線程即便空閒,也不會被回收。下個任務過來,那麼就使用這個空閒線程。

對於上述代碼,首先有:

if (workerCountOf(c) < corePoolSize)

也就是說,判斷如今的線程數量是否小於corePoolSize,若是小於,那麼就建立一個線程執行該任務,也就是執行

addWorker(command, true)

若是大於,那麼就把該任務放進隊列當中,即

workQueue.offer(command)

那麼,addWorker 是幹什麼的呢?

private boolean addWorker(Runnable firstTask, boolean core) {    
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
             ......
    }

在這裏能夠看到一些關鍵代碼,例如 w = new Worker(firstTask), 以及 workers.add(w); 從這裏 咱們就能夠看到,建立 線程對象 而且加入到 線程 隊列中。可是,咱們如今尚未看到具體是怎麼執行任務的,繼續追蹤
w = new Worker(firstTask),以下代碼:

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        ......
        
        final Thread thread;
        
        Runnable firstTask;
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
        public void run() {
            runWorker(this);
        }
        ......

對於 runWorker 方法:

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } 
                ......
    }

在這段代碼中,就有不少關鍵的信息,好比說,Runnable task = w.firstTask;若是爲空,那麼就 執行 task = getTask(),若是不爲空,那麼就 進行 task.run(); 調用其方法,這裏也就是具體的執行的任務

如今知道了是怎麼樣執行具體的任務,那麼假如任務的數量 大於 線程池的數量,那麼是怎麼實現等待的呢,這裏就須要看到getTask() 的具體實現了,以下:

private Runnable getTask() {
        for (;;) {
           ......
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

這裏能夠看到, 一個 for 死循環,以及

Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();

而 workQueue 是 BlockingQueue 類型,也就是帶有阻塞的功能。

這就是 線程如何等待執行的。

總結

如今就能夠知道,大體的線程池實現原理:

首先,各自存放線程和任務,其中,任務帶有阻塞。

private final HashSet<Worker> workers = new HashSet<Worker>();
private final BlockingQueue<Runnable> workQueue;

而後,在 execute 方法中 進行 addWorker(command,true),也就是建立一個線程,把任務放進去執行;或者是直接把任務放入到任務隊列中。

接着 若是是 addWorker,那麼就會 new Worker(task) -》調用其中 run() 方法,在Worker 的run() 方法中,調用 runWorker(this); 方法 -》在該方法中就會具體執行咱們的任務 task.run(); 同時這個 runWorker方法至關因而個死循環,正常狀況下就會一直取出 任務隊列中的任務來執行,這就保證了線程 不會銷燬。

因此,這也是爲何常說的線程池能夠避免線程的頻繁建立和 銷燬帶來的性能消耗。

寫在最後

  1. 寫出來,說出來才知道對不對,知道不對才能改正,改正了才能成長。

  2. 在技術方面,但願你們眼裏都容不得沙子。若是有不對的地方或者須要改進的地方但願能夠指出,萬分感謝。

相關文章
相關標籤/搜索