死磕 java線程系列之本身動手寫一個線程池

mythreadpool

(手機橫屏看源碼更方便)java


問題

(1)本身動手寫一個線程池須要考慮哪些因素?編程

(2)本身動手寫的線程池如何測試?安全

簡介

線程池是Java併發編程中常用到的技術,那麼本身如何動手寫一個線程池呢?本文彤哥將手把手帶你寫一個可用的線程池。併發

屬性分析

線程池,顧名思義它首先是一個「池」,這個池裏面放的是線程,線程是用來執行任務的。ide

首先,線程池中的線程應該是有類別的,有的是核心線程,有的是非核心線程,因此咱們須要兩個變量標識核心線程數量coreSize和最大線程數量maxSize。測試

爲何要區分是否爲核心線程呢?這是爲了控制系統中線程的數量。this

當線程池中線程數未達到核心線程數coreSize時,來一個任務加一個線程是能夠的,也能夠提升任務執行的效率。線程

當線程池中線程數達到核心線程數後,得控制一下線程的數量,來任務了先進隊列,若是任務執行足夠快,這些核心線程很快就能把隊列中的任務執行完畢,徹底沒有新增線程的必要。code

當隊列中任務也滿了,這時候光靠核心線程就沒法及時處理任務了,因此這時候就須要增長新的線程了,可是線程也不能無限制地增長,因此須要控制其最大線程數量maxSize。接口

其次,咱們須要一個任務隊列來存聽任務,這個隊列必須是線程安全的,咱們通常使用BlockingQueue阻塞隊列來充當,固然使用ConcurrentLinkedQueue也是能夠的(注意ConcurrentLinkedQueue不是阻塞隊列,不能運用在jdk的線程池中)。

最後,當任務愈來愈多而線程處理卻不及時,早晚會達到一種狀態,隊列滿了,線程數也達到最大線程數了,這時候怎麼辦呢?這時候就須要走拒絕策略了,也就是這些沒法及時處理的任務怎麼辦的一種策略,經常使用的策略有丟棄當前任務、丟棄最老的任務、調用者本身處理、拋出異常等。

根據上面的描述,咱們定義一個線程池一共須要這麼四個變量:核心線程數coreSize、最大線程數maxSize、阻塞隊列BlockingQueue、拒絕策略RejectPolicy。

另外,爲了便於給線程池一個名稱,咱們再加一個變量:線程池的名稱name。

因此咱們得出了線程池的屬性及構造方法大概以下:

public class MyThreadPoolExecutor implements Executor {
    /**
     * 線程池的名稱
     */
    private String name;
    /**
     * 核心線程數
     */
    private int coreSize;
    /**
     * 最大線程數
     */
    private int maxSize;
    /**
     * 任務隊列
     */
    private BlockingQueue<Runnable> taskQueue;
    /**
     * 拒絕策略
     */
    private RejectPolicy rejectPolicy;

    public MyThreadPoolExecutor(String name, int coreSize, int maxSize, BlockingQueue<Runnable> taskQueue, RejectPolicy rejectPolicy) {
        this.name = name;
        this.coreSize = coreSize;
        this.maxSize = maxSize;
        this.taskQueue = taskQueue;
        this.rejectPolicy = rejectPolicy;
    }
}

任務流向分析

根據上面的屬性分析,基本上咱們已經獲得了任務流向的完整邏輯:

首先,若是運行的線程數小於核心線程數,直接建立一個新的核心線程來運行新的任務。

其次,若是運行的線程數達到了核心線程數,則把新任務入隊列。

而後,若是隊列也滿了,則建立新的非核心線程來運行新的任務。

最後,若是非核心線程數也達到最大了,那就執行拒絕策略。

mythreadpool

代碼邏輯大體以下:

@Override
    public void execute(Runnable task) {
        // 正在運行的線程數
        int count = runningCount.get();
        // 若是正在運行的線程數小於核心線程數,直接加一個線程
        if (count < coreSize) {
            // 注意,這裏不必定添加成功,addWorker()方法裏面還要判斷一次是否是確實小
            if (addWorker(task, true)) {
                return;
            }
            // 若是添加核心線程失敗,進入下面的邏輯
        }

        // 若是達到了核心線程數,先嚐試讓任務入隊
        // 這裏之因此使用offer(),是由於若是隊列滿了offer()會當即返回false
        if (taskQueue.offer(task)) {
            // do nothing,爲了邏輯清晰這裏留個空if
            // 本文由公從號「彤哥讀源碼」原創
        } else {
            // 若是入隊失敗,說明隊列滿了,那就添加一個非核心線程
            if (!addWorker(task, false)) {
                // 若是添加非核心線程失敗了,那就執行拒絕策略
                rejectPolicy.reject(task, this);
            }
        }
    }

建立線程邏輯分析

首先,建立線程的依據是正在運行的線程數量有沒有達到核心線程數或者最大線程數,因此咱們還須要一個變量runningCount用來記錄正在運行的線程數。

其次,這個變量runningCount須要在併發環境下加加減減,因此這裏須要使用到Unsafe的CAS指令來控制其值的修改,用了CAS就要給這個變量加上volatile修飾,爲了方便咱們這裏直接使用AtomicInteger來做爲這個變量的類型。

而後,由於是併發環境中,因此須要判斷runningCount < coreSize(或maxSize)(條件一)的同時修改runningCount CAS加一(條件二)成功了才表示能夠增長一個線程,若是條件一失敗則表示不能再增長線程了直接返回false,若是條件二失敗則表示其它線程先修改了runningCount的值,則重試。

最後,建立一個線程並運行新任務,且不斷從隊列中拿任務來運行,本文由公從號「彤哥讀源碼」原創。

mythreadpool

代碼邏輯以下:

private boolean addWorker(Runnable newTask, boolean core) {
        // 自旋判斷是否是真的能夠建立一個線程
        for (; ; ) {
            // 正在運行的線程數
            int count = runningCount.get();
            // 核心線程仍是非核心線程
            int max = core ? coreSize : maxSize;
            // 不知足建立線程的條件,直接返回false
            if (count >= max) {
                return false;
            }
            // 修改runningCount成功,能夠建立線程
            if (runningCount.compareAndSet(count, count + 1)) {
                // 線程的名字
                String threadName = (core ? "core_" : "") + name + sequence.incrementAndGet();
                // 建立線程並啓動
                new Thread(() -> {
                    System.out.println("thread name: " + Thread.currentThread().getName());
                    // 運行的任務
                    Runnable task = newTask;
                    // 不斷從任務隊列中取任務執行,若是取出來的任務爲null,則跳出循環,線程也就結束了
                    while (task != null || (task = getTask()) != null) {
                        try {
                            // 執行任務
                            task.run();
                        } finally {
                            // 任務執行完成,置爲空
                            task = null;
                        }
                    }
                }, threadName).start();

                break;
            }
        }

        return true;
    }

取任務邏輯分析

從隊列中取任務應該使用take()方法,這個方法會一直阻塞直至取到任務或者中斷,若是中斷了就返回null,這樣當前線程也就能夠安靜地結束了,另外還要注意中斷了記得把runningCount減一。

private Runnable getTask() {
        try {
            // take()方法會一直阻塞直到取到任務爲止
            return taskQueue.take();
        } catch (InterruptedException e) {
            // 線程中斷了,返回null能夠結束當前線程
            // 當前線程都要結束了,理應要把runningCount的數量減一
            runningCount.decrementAndGet();
            return null;
        }
    }

好了,到這裏咱們本身的線程池就寫完了,下面咱們一塊兒來想一想怎麼測試呢?

測試邏輯分析

咱們再來回顧下本身的寫的線程池的構造方法:

public MyThreadPoolExecutor(String name, int coreSize, int maxSize, BlockingQueue<Runnable> taskQueue, RejectPolicy rejectPolicy) {
        this.name = name;
        this.coreSize = coreSize;
        this.maxSize = maxSize;
        this.taskQueue = taskQueue;
        this.rejectPolicy = rejectPolicy;
    }

name,這個隨便傳;

coreSize,咱們假設爲5;

maxSize,咱們假設爲10;

taskQueue,任務隊列,既然咱們設置的是有邊界的,咱們就用最簡單的ArrayBlockingQueue好吧,容量設置爲15,這樣裏面最多能夠存儲15條任務;

rejectPolicy,拒絕策略,咱們假設使用丟棄當前任務的策略,OK,咱們來實現一個。

/**
 * 丟棄當前任務
 */
public class DiscardRejectPolicy implements RejectPolicy {
    @Override
    public void reject(Runnable task, MyThreadPoolExecutor myThreadPoolExecutor) {
        // do nothing
        System.out.println("discard one task");
    }
}

OK,這樣一個線程池就建立完成了,下面就是執行任務了,咱們假設經過for循環接二連三地添加100個任務好很差。

public class MyThreadPoolExecutorTest {
    public static void main(String[] args) {
        Executor threadPool = new MyThreadPoolExecutor("test", 5, 10, new ArrayBlockingQueue<>(15), new DiscardRejectPolicy());
        AtomicInteger num = new AtomicInteger(0);

        for (int i = 0; i < 100; i++) {
            threadPool.execute(()->{
                try {
                    Thread.sleep(1000);
                    System.out.println("running: " + System.currentTimeMillis() + ": " + num.incrementAndGet());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
    }
}

咱們分析下這段程序:

(1)先連續建立了5個核心線程,並執行了新任務;

(2)後面的15個任務進了隊列;

(3)隊列滿了,又連續建立了5個線程,並執行了新任務;

(4)後面的任務就沒得執行了,所有走了丟棄策略;

(5)因此真正執行成功的任務應該是 5 + 15 + 5 = 25 條任務;

運行之:

thread name: core_test2
thread name: core_test5
thread name: core_test3
thread name: core_test4
thread name: core_test1
thread name: test6
thread name: test7
thread name: test8
thread name: test9
discard one task
thread name: test10
discard one task
...省略被拒絕的任務
本文由公從號「彤哥讀源碼」原創
discard one task
running: 1570546871851: 2
running: 1570546871851: 8
running: 1570546871851: 7
running: 1570546871851: 6
running: 1570546871851: 5
running: 1570546871851: 3
running: 1570546871851: 4
running: 1570546871851: 1
running: 1570546871851: 10
running: 1570546871851: 9
running: 1570546872852: 14
running: 1570546872852: 20
running: 1570546872852: 19
running: 1570546872852: 17
running: 1570546872852: 18
running: 1570546872852: 16
running: 1570546872852: 15
running: 1570546872852: 12
running: 1570546872852: 13
running: 1570546872852: 11
running: 1570546873852: 21
running: 1570546873852: 24
running: 1570546873852: 23
running: 1570546873852: 25
running: 1570546873852: 22

能夠看到,建立了5個核心線程、5個非核心線程,成功執行了25條任務,完成沒問題,完美^^。

總結

(1)本身動手寫一個線程池須要考慮的因素主要有:核心線程數、最大線程數、任務隊列、拒絕策略。

(2)建立線程的時候要時刻警戒併發的陷阱;

彩蛋

咱們知道,jdk自帶的線程池還有兩個參數:keepAliveTime、unit,它們是幹什麼的呢?

答:它們是用來控制什麼時候銷燬非核心線程的,固然也能夠銷燬核心線程,具體的分析請期待下一章吧。

完整源碼

Executor接口

public interface Executor {
    void execute(Runnable command);
}

MyThreadPoolExecutor線程池實現類

/**
 * 自動動手寫一個線程池
 */
public class MyThreadPoolExecutor implements Executor {

    /**
     * 線程池的名稱
     */
    private String name;
    /**
     * 線程序列號
     */
    private AtomicInteger sequence = new AtomicInteger(0);
    /**
     * 核心線程數
     */
    private int coreSize;
    /**
     * 最大線程數
     */
    private int maxSize;
    /**
     * 任務隊列
     */
    private BlockingQueue<Runnable> taskQueue;
    /**
     * 拒絕策略
     */
    private RejectPolicy rejectPolicy;
    /**
     * 當前正在運行的線程數,本文由公從號「彤哥讀源碼」原創
     * 須要修改時線程間當即感知,因此使用AtomicInteger
     * 或者也可使用volatile並結合Unsafe作CAS操做(參考Unsafe篇章講解)
     */
    private AtomicInteger runningCount = new AtomicInteger(0);

    public MyThreadPoolExecutor(String name, int coreSize, int maxSize, BlockingQueue<Runnable> taskQueue, RejectPolicy rejectPolicy) {
        this.name = name;
        this.coreSize = coreSize;
        this.maxSize = maxSize;
        this.taskQueue = taskQueue;
        this.rejectPolicy = rejectPolicy;
    }

    @Override
    public void execute(Runnable task) {
        // 正在運行的線程數
        int count = runningCount.get();
        // 若是正在運行的線程數小於核心線程數,直接加一個線程
        if (count < coreSize) {
            // 注意,這裏不必定添加成功,addWorker()方法裏面還要判斷一次是否是確實小
            if (addWorker(task, true)) {
                return;
            }
            // 若是添加核心線程失敗,進入下面的邏輯
        }

        // 若是達到了核心線程數,先嚐試讓任務入隊
        // 這裏之因此使用offer(),是由於若是隊列滿了offer()會當即返回false
        if (taskQueue.offer(task)) {
            // do nothing,爲了邏輯清晰這裏留個空if
        } else {
            // 若是入隊失敗,說明隊列滿了,那就添加一個非核心線程
            if (!addWorker(task, false)) {
                // 若是添加非核心線程失敗了,那就執行拒絕策略
                rejectPolicy.reject(task, this);
            }
        }
    }

    private boolean addWorker(Runnable newTask, boolean core) {
        // 自旋判斷是否是真的能夠建立一個線程
        for (; ; ) {
            // 正在運行的線程數
            int count = runningCount.get();
            // 核心線程仍是非核心線程
            int max = core ? coreSize : maxSize;
            // 不知足建立線程的條件,直接返回false
            if (count >= max) {
                return false;
            }
            // 修改runningCount成功,能夠建立線程
            if (runningCount.compareAndSet(count, count + 1)) {
                // 線程的名字
                String threadName = (core ? "core_" : "") + name + sequence.incrementAndGet();
                // 建立線程並啓動
                new Thread(() -> {
                    System.out.println("thread name: " + Thread.currentThread().getName());
                    // 運行的任務,本文由公從號「彤哥讀源碼」原創
                    Runnable task = newTask;
                    // 不斷從任務隊列中取任務執行,若是取出來的任務爲null,則跳出循環,線程也就結束了
                    while (task != null || (task = getTask()) != null) {
                        try {
                            // 執行任務
                            task.run();
                        } finally {
                            // 任務執行完成,置爲空
                            task = null;
                        }
                    }
                }, threadName).start();

                break;
            }
        }

        return true;
    }

    private Runnable getTask() {
        try {
            // take()方法會一直阻塞直到取到任務爲止
            return taskQueue.take();
        } catch (InterruptedException e) {
            // 線程中斷了,返回null能夠結束當前線程
            // 當前線程都要結束了,理應要把runningCount的數量減一
            runningCount.decrementAndGet();
            return null;
        }
    }

}

RejectPolicy拒絕策略接口

public interface RejectPolicy {
    void reject(Runnable task, MyThreadPoolExecutor myThreadPoolExecutor);
}

DiscardRejectPolicy丟棄策略實現類

/**
 * 丟棄當前任務
 */
public class DiscardRejectPolicy implements RejectPolicy {
    @Override
    public void reject(Runnable task, MyThreadPoolExecutor myThreadPoolExecutor) {
        // do nothing
        System.out.println("discard one task");
    }
}

測試類

public class MyThreadPoolExecutorTest {
    public static void main(String[] args) {
        Executor threadPool = new MyThreadPoolExecutor("test", 5, 10, new ArrayBlockingQueue<>(15), new DiscardRejectPolicy());
        AtomicInteger num = new AtomicInteger(0);

        for (int i = 0; i < 100; i++) {
            threadPool.execute(()->{
                try {
                    Thread.sleep(1000);
                    System.out.println("running: " + System.currentTimeMillis() + ": " + num.incrementAndGet());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

    }
}
相關文章
相關標籤/搜索