手動造一個線程池(Java)

前言

​ 本次本身實現一個簡單的線程池,主要是爲了後續看ThreadPool的源碼作準備的,是從別人的代碼中改進的,從看別人的源碼中學到一些東西,因此特地把這篇文章寫出來,方便之後本身去回顧本身是如何學習。固然也但願分享出來能夠對別人產生良好的影響!html

使用Java的線程池

​ 在本身實現一個線程池以前,首先要知道怎麼用。由於知道怎麼用以後才能去理解一些代碼的編寫。關於怎麼用這裏就再也不多加贅述了,百度或者谷歌一下就好,爲了避免讓讀者花過多的時間去找,我找了一篇文章,說得比較清楚。java

總覽

咱們能夠看到,除了ThreadRunnable,其餘都是咱們本身定義的,下面咱們來逐一說明。git

在咱們開始分析以前,先說下線程池的工做流程,也方便你們後面看的時候心理有一個底。github

線程池顧名思義就是一個存放多個線程的池子。那麼在計算機語言中,咱們就是用數據結構來存放線程,在本線程池中用的是一個隊列來存放要處理任務的線程。因此在線程池一啓動,線程池裏面就應該有必定數量的線程數目了,那麼這個線程的數目是多少咱們先不用管,只須要知道有一些線程在等待用戶把所須要線程執行的任務放進池子裏面。而後線程池裏面的線程就會自動幫你執行任務啦。數據結構

固然有些人說,我執行一個任務就建立一個線程就行了呀,何須大費周章呢。咱們須要知道,來一個任務就建立一個線程,多線程

  1. 建立線程須要時間 ,影響響應速度。ide

  2. 系統資源有限,若是有數以萬計的線程須要建立,會大大消耗系統資源,會下降系統的穩定性。函數

其實有不少任務的時候,有些線程只是處理一些很輕的任務,很快就完成了,那麼若是下一個任務恰好到達的時候,以前的線程也恰好完成工做了,那麼這個線程就順便接下到來的任務,這樣的話豈不是提升了響應速度,而後又重複利用了線程,下降系統資源的損耗。豈不是一箭雙鵰。post

以前都是恰巧,那麼咱們稍微放寬一點條件。若是線程執行完任務了,就先別退出唄。而是在等待執行任務,這個線程就能夠看作被賦予執行任務的命令!**就等着任務來,任務一來,我就去執行。任務執行結束,線程就等,直到下一個任務來。周而復始,直到手動關閉!**這就是線程池的本質。學習

那麼問題來了,線程池裏面只有5個線程在等待執行任務,但是同時來了10個任務須要執行,那麼有5個任務被執行了,剩下那5個放哪裏?難道被丟棄?這可不是咱們設計線程池的初衷!你確定能夠想到,確定是拿同樣數據結構去存儲剩下的線程呀!(咱們用隊列存儲,而後稱爲工做隊列。)由於線程處理任務的時間是不必定的,確定是有些線程處理的快,有些慢。因此誰先處理的快,誰就去處理剩下的任務。正所謂能者多勞!

再拋出一個問題,假如前面5個線程執行得很慢,那麼後面那5個線程就須要等好久,這時候還不如直接建立線程去操做呢,沒錯,線程池在設計的時候也想到過這個問題,關於這個問題在後面咱們設計的時候會說道,這裏就先往下看吧!

既然涉及到多線程,那麼確定就涉及到同步的問題,對哪一個對象須要同步呢?固然是任務隊列啦。咱們須要知道頗有可能同時會有不少個線程對同一個任務隊列取任務和聽任務的,因此爲了實現同步,咱們這裏用了synchronized關鍵字實現同步,也就是對這個任務隊列加一把鎖,哪一個線程能夠拿到操做任務隊列的鎖哪一個線程就能夠領取任務。沒拿到這把鎖的線程就死等,除非被中斷或者手動關閉。

這裏須要注意的是掛起阻塞等待拿鎖的區別。

  1. 掛起阻塞是該線程拿到鎖以後調用await方法纔會進入的狀態,前提是先拿到鎖。被通知以後就會被喚醒,而後從await以後的代碼執行。

  2. 等待拿鎖是別的線程還在佔有鎖,此時的線程還沒拿到鎖,就會進入這個鎖的entrySet序列等待,直到鎖被釋放而後再去搶,搶到爲止!

通過上面的講解,咱們能夠基本瞭解了線程池的設計思想和原理,下面補充點內容。

  1. 線程池內部有兩個數據結構(隊列)分別存放須要執行任務的線程(也叫工做線程)和所須要被**執行的任務*。

  2. 線程池初始化的線程放在工做隊列裏面,用戶想要執行的任務放在任務隊列

  3. 在用戶添加任務以後,會通知工做隊列的線程去取任務啦!

  4. 工做隊列的線程若是有空而且任務隊列不爲空,哪一個線程拿到鎖哪一個線程就能夠在任務隊列取任務,而後任務隊列的任務數就-1。

  5. 不少個線程去拿鎖的時候,只能有一個線程拿到。其餘沒拿到鎖的線程不是阻塞等待,而是等待拿鎖!

  6. 若是拿到鎖以後任務隊列爲空,就掛起阻塞。若是被通知喚醒,繼續執行3 4 5 6操做。

先看看咱們這個整個線程池的流程圖,這樣設計的時候就知道怎麼回事了!

過程

BaseThreadPool

先看看這個類的基本屬性

public class BaseThreadPool extends Thread implements ThreadPool { 
	
    /*初始化線程數*/
    private int initSize;

    /*最大工做線程數*/
    private int maxSize;

    /*核心線程數*/
    private int coreSize;

    /*當前活躍線程數*/
    private  int activityCount = 0;

    /*指定任務隊列的大小數*/
    private int queueSize;

    /*建立工做線程的工廠,在構造方法由線程池規定好*/
    private ThreadFactory threadFactory;

    /*1. 任務隊列,在構造方法由線程池規定好*/
    private RunnableQueue runnableQueue;

    //2. 工做隊列
    private final static Queue<ThreadTask> threadQueue = new ArrayDeque<>();

    //3. 本線程池默認的拒絕策略
    private final static DenyPolicy DEFAULT_DENY_POLICY = new DenyPolicy.IgnoreDenyPolicy();

    /*4. 默認的線程工廠*/
    private final static ThreadFactory DEFAULT_THREAD_FACTORY =new DefaultThreadFactory();

    /*線程池是否關閉,默認爲false*/
    boolean isShutdown = false;

    private  long keepAliveTime;

    private  TimeUnit timeUnit ;
複製代碼

由上面的屬性咱們知道,咱們自定義的線程池這個類是依賴於幾個類的。

依次是 RunnableQueueDenyPolicyThreadFactory

而且由總覽圖咱們知道,BaseThreadPool是實現了咱們定義的ThreadPool接口和繼承了Thread類,而且重寫了run方法

run 裏面的邏輯到後面再分析,這裏能夠先跳過這裏。

@Override
    public void run() { // BaseThreadPool
        while (!isShutdown && !isInterrupted()){
            try {
                timeUnit.sleep(keepAliveTime);
            } catch (InterruptedException e) {
               //到這裏就是關閉線程池了
                isShutdown = true;
                continue;
            }
// 這裏同步代碼塊,保證了每次訪問的時候都是最新的數據!
            synchronized (this){
                if(isShutdown) break;
// 任務隊列不爲空,而且當前能夠工做的線程小於coreCount,那麼說明工做線程數不夠,就先增長到maxSize
// 好比說coreSize 爲20,initSize爲10,maxSize 爲30,
// 忽然一會兒來了20分線程進來,可是工做線程只有15個,因爲某種緣由可能那15個工做現場還沒執行完,那麼此時的任務隊列確定還有剩餘的,發現此時線程還沒到coreSize
// 那麼就直接開maxSize個線程先把
                if(runnableQueue.size() > 0){
                    for (int i = runnableQueue.size(); i < maxSize; i++) {
                        newThread();
                    }
                }
// 任務隊列爲空,而且當前能夠工做的線程數大於coreCount,工做線程數太多啦!那麼就減小到coreCount
                if(runnableQueue.size() == 0 &&  activityCount > coreSize){
                    for (int i = coreSize; i < activityCount; i++) {
                        removeThread();
                    }
                }
            }
        }
    }
複製代碼

咱們先來看下BaseThreadPool的構造方法

//1 用戶傳入初始化線程數,最大線程數,核心線程數,和任務隊列的大小便可
public BaseThreadPool(int initSize, int maxSize, int coreSize,int queueSize) {
   /*這裏建立線程的工廠和拒絕策略都是用本身定義好的對象*/  this(initSize,maxSize,coreSize,queueSize,DEFAULT_THREAD_FACTORY,DEFAULT_DENY_POLICY,10,TimeUnit.SECONDS);
    }

// 2
public BaseThreadPool(int initSize, int maxSize, int coreSize, int queueSize, ThreadFactory threadFactory, DenyPolicy denyPolicy, long keepAliveTime, TimeUnit timeUnit) {
        this.initSize = initSize; //初始化線程池的初始化線程數
        this.maxSize = maxSize; // 初始化線程池能夠擁有最大的線程數
        this.coreSize = coreSize; // 這個值的意義後面說
        this.threadFactory = threadFactory; //初始化建立線程池的工廠
        //自定義存聽任務的隊列
        this.runnableQueue = new LinkRunnableQueue(queueSize,denyPolicy,this); //RunnableQueue的實現類,本身定義
        this.keepAliveTime = keepAliveTime;
        this.timeUnit = timeUnit;
        this.init(); //初始化函數
    }

// ---init()

 public void init(){

        /*啓動本線程池*/
        this.start();//BaseThreadPool 繼承了 Thread,緣由後面說

        /*初始化initSize個線程在線程池中*/
        for (int i = 0; i < initSize; i++) {
            newThread();
        }
    }

// newThread()

  public void newThread(){
        /*建立工做線程,而後讓工做線程等待任務到來被喚醒*/
        Woker woker = new Woker(runnableQueue);
        Thread thread = threadFactory.createThread(woker);

        /*將線程和任務包裝在一塊兒*/
        ThreadTask threadTask = new ThreadTask(thread,woker);
        threadQueue.offer(threadTask);
        this.activityCount++;
        /*啓動剛纔新建的線程*/
        thread.start();
    }


// 再看看DefaultThreadFactory,就是
/*工廠建立一個新的線程*/
public class DefaultThreadFactory implements ThreadFactory {

    private static final AtomicInteger GROUP_COUNTER  = new AtomicInteger(0); //線程組號
    //計數
    private static  AtomicInteger COUNTER = new AtomicInteger(1);
    private static final ThreadGroup group  = new ThreadGroup("MyThreadPool-" + GROUP_COUNTER.getAndIncrement());

    @Override
    public Thread createThread(Runnable runnable) {
        return new Thread(group,runnable,"threadPool-" + COUNTER.getAndIncrement());
    }
}

複製代碼

這裏說明一下,咱們是能夠這樣new Thread(new Runnable(){....}).start建立而且啓動線程的。就是調用Thread須要傳入一個Runnable實例的構造函數實例化Thread類,經過重寫Runnable裏面的run方法就能夠指定線程在啓動的時候須要作的事。

咱們看到DefaultThreadFactory就只有一個建立線程的方法,就是把線程啓動後須要作的任務指定一下和重命名一下線程,就是用上面說明的方法。因此傳給須要傳給createThread方法一個實現Runnable的類。而這個類就是Woker

咱們看下Woker的代碼

//------------Woker BaseThreadPool依賴的類

/*工做線程的任務*/
public class Woker implements Runnable{
    /*任務隊列,方便後面取出任務*/
    private RunnableQueue runnableQueue;

    /*方便判斷該內部任務對應的線程是否運行,確保可見性!*/
    private volatile boolean running = true;

    public Woker(RunnableQueue runnableQueue) {
        this.runnableQueue = runnableQueue;
    }

    @Override
    public void run() {
        /*當前對應的線程正在運行而且沒有被中斷*/
        while (running && !Thread.currentThread().isInterrupted()){
            //調用take的時候,若是任務隊列沒任務就會阻塞在這,直到拿到任務
            Runnable task = runnableQueue.take();
            task.run();
        }
    }

    public void stop(){
        running = false;
    }

}

複製代碼

咱們看到run方法,這個任務就是去到任務隊列裏面取任務,而後執行。直到當前工做中止或者當前線程被中斷。而這個任務隊列就是咱們在調用構造函數的時候指定的對象,也就是這段代碼

this.runnableQueue = new LinkRunnableQueue(queueSize,denyPolicy,this);

接下來看下LinkRunnableQueue是怎麼實現的

public class LinkRunnableQueue implements RunnableQueue{//BaseThreadPool依賴的類

    //指定任務隊列的大小
    private int limit;

    //也是使用BaseThreadPool傳進來的默認拒絕策略
    private DenyPolicy denyPolicy;

    //這裏傳進BaseThreadPool實例
    private ThreadPool threadPool;

   	//這個就是真正存儲Runnable實例對象的數據結構!也就是一個鏈表
    private LinkedList<Runnable> queue = new LinkedList<>();

    //構造函數,也就是初始化這個類的屬性
    public LinkRunnableQueue(int queueSize,DenyPolicy denyPolicy,ThreadPool pool) {
        this.limit = queueSize;
        this.denyPolicy = denyPolicy;
        this.threadPool = pool;
    }

    //任務隊列添加任務,這個方法通常由線程池的execute方法調用
    @Override
    public void offer(Runnable runnable) {
        //由於任務隊列只有一個,可能會有多個線程同時操做任務隊列,因此要考慮同步問題
        //取得queue的鎖才能加入任務,拿不到所就進入queue的entrySet
        synchronized (queue){
        if(queue.size() > limit){
            //若是此時任務隊列超過限制的值,那麼就拒絕!
            denyPolicy.reject(runnable,threadPool);
        }else{
            //把任務加入到任務隊列唄
            queue.addLast(runnable);
            //喚醒等待的線程,這些線程在queue的waitSet裏面,要結合take方法
            queue.notifyAll();
        }
    }
}

   	//線程從任務隊列裏面拿任務,若是拿不到就會阻塞,直到有任務來而且搶到
    @Override
    public Runnable take() {
        //這裏以前也說過了,要先拿到鎖才能拿任務
        synchronized (queue){
            //若是任務隊列爲空,那麼確定拿不了,因此就等待唄
            while (queue.size() == 0){
                try {
                    //這個線程在這裏就等待讓出鎖,直到執行offer方法從而被喚醒,而後
                    //再從新搶到鎖,這裏是個循環,若是被喚醒後,也搶到鎖了,可是隊列
                    //仍是空的話,繼續等待
                    queue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //到這裏執行這個方法的線程就是搶到鎖了,而後獲得任務啦!
            return queue.removeFirst();
        }

    }

    //返回調用該方法時任務隊列有多少個任務在等待
    @Override
    public int size() {
       synchronized (queue){
           return queue.size();
       }
    }
}
複製代碼

代碼的註釋已經解釋得很清楚了,這裏主要是瞭解爲何Work中的Runnable task = runnableQueue.take()中沒有任務會阻塞等待,本質就是

1 拿到queue對象鎖以後,任務隊列沒任務,釋放掉真正存儲任務的對象的對象鎖,從而進入該對象的waitSet隊列裏面等待被喚醒。

2 固然若是沒拿到鎖也會一直等待拿到鎖,而後像1同樣.

若是看到這裏看不太明白的,你們能夠先回去看一下java線程的基本知識和synchronized的詳解,這樣能夠更好地把知識串聯起來!

接下來咱們再看下 工做隊列是什麼樣子。

ThreadTask在BaseThreadPool的一個內部類

//把工做線程和內部任務綁定在一塊兒
    class ThreadTask{
        Thread thread;
        Woker woker;
        public ThreadTask(Thread thread, Woker woker) {
            this.thread = thread;
            this.woker = woker;
        }
    }

複製代碼

從上面的代碼咱們知道,ThreadTask就是把一個工做線程和一個工做線程的任務封裝在一塊兒而已,這裏主要是爲了後面線程池關閉的時候可讓線程須要作的任務中止!

線程池關閉的操做 ,BaseThreadPool類的方法

/*shutdown 就要把 Woker 給中止 和 對應的線程給中斷*/
    @Override
    public void shutDown() {
        synchronized (this){
            if(isShutDown())
                return;
            //設置標誌位,讓線程池線程也執行完run方法,而後退出線程。
            isShutdown = true;
            /*所有線程中止工做*/
            for (ThreadTask task: threadQueue
                 ) {
                //1 這裏就是把Woker實例對象的running置爲false
                task.woker.stop();
                //2 中斷執行對應任務的線程
                task.thread.interrupt();
            }
        }
    }
複製代碼

能夠看到關閉線程池,就是遍歷存放工做線程的隊列,1和2都是破壞Woker對象的while循環條件,從而讓Woker對象的run方法執行結束。(這裏你們能夠看下Woker這個類的run方法就明白我說的了)

咱們在開始的時候說過,BaseThreadPool啓動的時候其實也是一個線程,在它的init方法中就調用了start方法表示執行run裏面的邏輯,以前咱們看了run的代碼,可是沒分析,如今就來分析吧

@Override 
    public void run() { //BaseThreadPool類的方法
        //還記得shutDown()方法裏面的 isShutdown = true語句嗎?
        //做用就是爲了讓這裏下一次判斷while循環的時候退出,而後執行完run啦!
        while (!isShutdown && !isInterrupted()){
            try {
                timeUnit.sleep(keepAliveTime);
            } catch (InterruptedException e) {
                //若是線程池這個線程被中斷
                //到這裏就是關閉線程池了,也是把isShutdown設置爲我true!
                isShutdown = true;
                continue;
            }
// 這裏同步代碼塊,保證了每次訪問的時候都是最新的數據!
            synchronized (this){
                if(isShutdown) break;
				//任務隊列不爲空,而且當前能夠工做的線程小於coreCount,那麼說明工做 //線程數不夠,就先增長到maxSize.
				//好比說coreSize 爲20,initSize爲10,maxSize 爲30,
				//忽然一會兒來了20分線程進來,可是工做線程只有15個,因爲某種緣由可能
                //那15個工做現場還沒執行完,那麼此時的任務隊列確定還有剩餘的,發現此
                //時線程還沒到coreSize
				//那麼就直接開maxSize個線程先把
                //若是發現如今工做的的線程已通過了coreSize就先不增長線程數啦
                if(runnableQueue.size() > 0 && activityCount < coreSize){
                    for (int i = runnableQueue.size(); i < maxSize; i++) {
                        newThread();
                    }
                }
// 任務隊列爲空,而且當前能夠工做的線程數大於coreCount,工做線程數太多啦!那麼就減小到coreCount基本大小把
                if(runnableQueue.size() == 0 &&  activityCount > coreSize){
                    for (int i = coreSize; i < activityCount; i++) {
                        removeThread();
                    }
                }
            }
        }
    }


//----------removeThread()
// 線程池中去掉某個工做線程,這裏的操做是否是很相似shutDown的內容
    public void removeThread(){
        this.activityCount--;
        ThreadTask task = threadQueue.remove();
        task.woker.stop();//就是破壞Woker對象的while循環的條件
    }
複製代碼

上面的註釋講解的比較清楚,有啥不懂的多看幾篇,本身模擬一下思路就好啦!

run方法中,重要的是關於線程池中的線程數量的動態變化的部分。

coreSize:線程池基本的大小,至關於一個分界線

initSize:線程池的初始化大小,這枚啥好說的

activityCount:當前工做線程的數量

maxSIze:線程池中最大的線程數目

說一下它們之間的關係

任務隊列不爲空的狀況下

  1. activityCount < coreSize的時候,就說明線程池的數量沒到達基本大小,就新增線程,直接新增到最大!

  2. activityCount >= coreSize的時候,說明當前線程池的工做線程數量已經到達基本大小,有任務來就須要等一下啦!

注意:這裏的擴容機制只是簡單地擴容,Java中實現的線程池並非像我說那樣擴容的,這就解決了開頭的問題啦,具體的到時候仍是分析源碼的時候再說把!這裏只是簡單地實現一下!

測試

測試代碼

package blogDemo.ThreadDemo;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class Test {
    public static void main(String[] args) {
        ThreadPool threadPool = new BaseThreadPool(4,30,6,30);
        for (int i = 0; i < 20; i++) {
            threadPool.execute(() -> {
                System.out.println(Thread.currentThread().getName() + " is running and done.");
            });
        }

    }
}

複製代碼

測試結果

項目代碼

github.com/JiemingLi/T…

總結

本篇文章就寫到這裏啦,你們看文章的時候能夠一邊看代碼一邊看解釋,這樣會更加容易理解,但願對讀者後面理解java自帶線程池有所幫助,下一篇文章就分析java自帶的線程池的源碼啦!

相關文章
相關標籤/搜索