造輪子-ThreadPoolExecutor

如下代碼的實現邏輯出自於公衆號 碼農翻身java

《你管這破玩意叫線程池?》web

- PS:劉欣老師在我心中是軟件技術行業的大劉。spring

線程池接口

public interface Executor {
    public void execute(Runnable r);
}
View Code

接口中只有一個抽象方法,execute(Runnable r);它接收一個Runnable,無返回值實現它的子類只須要將傳入的Runnable執行便可。數組

NewsThreadExecutor

package com.datang.bingxiang.run;

import com.datang.bingxiang.run.intr.Executor;

public class NewsThreadExecutor implements Executor {
    //每次調用都建立一個新的線程
    @Override
    public void execute(Runnable r) {
        new Thread(r).start();
    }

}
View Code

這個實現類最簡單也最明白,真的每次調用咱們都建立一個Thread將參數Runnable執行。這麼作的弊端就是每一個調用者發佈一個任務都須要建立一個新的線程,線程使用後就被銷燬了,對內存形成了很大的浪費。安全

SingThreadExecutor

package com.datang.bingxiang.run;

import java.util.concurrent.ArrayBlockingQueue;

import com.datang.bingxiang.run.intr.Executor;

//只有一個線程,在實例化後就啓動線程。用戶調用execute()傳遞的Runnable會添加到隊列中。
//隊列有一個固定的容量3,若是隊列滿則拋棄任務。
//線程的run方法不停的循環,從隊列裏取Runnable而後執行其run()方法。
public class SingThreadExecutor implements Executor {

    // ArrayBlockingQueue 數組類型的有界隊列
    // LinkedBlockingDeque 鏈表類型的有界雙端隊列
    // LinkedBlockingQueue 鏈表類型的有界單向隊列
    private ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(1);

    //線程不停的從隊列獲取任務
    private Thread worker = new Thread(() -> {
        while (true) {
            try {
                //take會在獲取不到任務時阻塞。而且也有Lock鎖
                Runnable r = queue.take();
                r.run();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

        }
    });

    // 構造函數啓動線程
    public SingThreadExecutor() {
        worker.start();
    }

    @Override
    public void execute(Runnable r) {
        // 這個offer和add不一樣的是offer有Lock鎖,若是隊列滿則返回false。
        // add則是隊列滿拋出異常,而且沒有Lock鎖。
        if (!queue.offer(r)) {
            System.out.println("線程等待隊列已滿,不可加入。本次任務丟棄!");
        }
    }

}
View Code

改變下思路,此次線程池實現類只建立一個線程,調用者發佈的任務都存放到一個隊列中(隊列符合先進先出的需求)可是注意咱們設計線程池必定要選擇有界隊列,由於咱們不能無限制的往隊列中添加任務。在隊列滿後,在進來的任務就要被拒絕掉。ArrayBlockingQueue數據結構

是一個底層有數組實現的有界阻塞隊列,實例化一個ArrayBlockingQueue傳遞參數爲1,表示隊列長度最大爲1.惟一的一個工做線程也是成員變量,線程執行後不斷的自旋從隊列中獲取任務,take()方法將隊列頭的元素出隊,若隊列爲空則阻塞,這個方法是線程安全的。多線程

execute(r)方法接收到任務後,將任務添加到隊列中,offer()方法將元素添加到隊列若隊列已滿則返回false。execute(r)則直接拒絕掉本次任務。app

CorePollThreadExecutor

SingThreadExecutor線程池的缺點是隻有一個工做線程,這樣顯然是不夠靈活,CorePollThreadExecutor中增長了corePollSize核心線程數參數,由用戶規定有須要幾個工做線程。此次咱們選用的隊列爲LinkedBlockingQueue這是一個數據結構爲鏈表的有界阻塞單向隊列。
ide

initThread()方法根據corePollSize循環建立N個線程,線程建立後一樣調用take()方法從阻塞隊列中獲取元素,若獲取成功則執行Runnable的run()方法,若獲取隊列中沒有元素則阻塞。execute(r)則仍是負責將任務添加到隊列中。函數

 

CountCorePollThreadExecutor

CorePollThreadExecutor中有三個問題

1 當隊列滿時線程池直接拒絕了任務,這應該讓用戶決定被拒絕的任務如何處理。

2 線程的建立策略也應該交給用戶作處理。

3 初始化後就建立了N個核心線程數,可是這些線程可能會用不到而形成浪費。

RejectedExecutionHandler接口的實現應該讓用戶決定如何處理隊列滿的異常狀況。

package com.datang.bingxiang.run.intr;

public interface RejectedExecutionHandler {
    public void rejectedExecution();
}
View Code
package com.datang.bingxiang.run;

import com.datang.bingxiang.run.intr.RejectedExecutionHandler;

public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {

    @Override
    public void rejectedExecution() {
        System.out.println("隊列已經滿了!!!!當前task被拒絕");
    }

}
View Code

ThreadFactory接口的實現應該讓用戶決定建立線程的方法。

package com.datang.bingxiang.run.intr;

public interface ThreadFactory {
    public Thread newThread(Runnable r);
}
View Code
package com.datang.bingxiang.run;

import com.datang.bingxiang.run.intr.ThreadFactory;

public class CustomThreadFactory implements ThreadFactory {
    @Override
    public Thread newThread(Runnable r) {
        System.out.println("建立了新的核心線程");
        return new Thread(r);
    }

}
View Code

CountCorePollThreadExecutor的構造函數接收三個參數corePollSize,rejectedExecutionHandler,threadFactory。由於如今咱們須要按需建立核心線程,因此須要一個變量workCount記錄當前已經建立的工做線程,爲了保證線程之間拿到的workCount是最新的(可見性),咱們須要給變量workCount加上volatile修飾,保證改變了的修改能被全部線程看到。execute(r)首先要調用initThread(r)判斷是否有線程被建立,若是沒有線程建立則表示工做線程數已經和核心線程數相同了,此時須要將新的任務添加到隊列中,若是隊列滿,則執行傳入的拒絕策略。重要的方法在於initThread(r)。initThread(r)方法返回true表示有工做線程被建立任務將被工做線程直接執行,無需入隊列。返回false則將任務入隊,隊列滿則執行拒絕策略。

fill變量表示核心線程數是否所有建立,爲了保證多線程的環境下不會建立多於corePoolSize個數的線程,因此須要使用同步鎖,initThread(r)都要使用鎖則會下降效率,尤爲是當工做線程數已經到達核心線程數後,因此這一塊代碼使用到了雙重判斷,當加鎖後在此判斷工做線程是否已滿。若是已滿返回false。接下來使用threadFactory工廠建立線程,在線程中使用代碼塊,保證當前任務能夠被新建立的工做線程執行。新的工做線程依然是從隊列中獲取任務並執行。線程開啓後工做線程++,若是工做線程數等於核心線程數則改變fill標記。返回true,成功建立線程,不要忘記在finally中釋放鎖。

package com.datang.bingxiang.run;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import com.datang.bingxiang.run.intr.Executor;
import com.datang.bingxiang.run.intr.RejectedExecutionHandler;
import com.datang.bingxiang.run.intr.ThreadFactory;

public class CountCorePollThreadExecutor implements Executor {

    // 核心線程數
    private Integer corePollSize;

    // 工做線程數,也就是線程實例的數量
    private volatile Integer workCount = 0;

    // 線程是否已滿
    private volatile boolean fill = false;

    // 拒絕策略,由調用者傳入,當隊列滿時,執行自定義策略
    private RejectedExecutionHandler rejectedExecutionHandler;

    // 線程工廠,由調用者傳入
    private ThreadFactory threadFactory;

    public CountCorePollThreadExecutor(Integer corePollSize, RejectedExecutionHandler rejectedExecutionHandler,
            ThreadFactory threadFactory) {
        this.corePollSize = corePollSize;
        this.rejectedExecutionHandler = rejectedExecutionHandler;
        this.threadFactory = threadFactory;
    }

    // 此次使用鏈表類型的單向隊列
    LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(1);

    @Override
    public void execute(Runnable r) {
        // 若是沒有建立線程
        if (!initThread(r)) {
            // offer和ArrayBlockingQueue的offer相同的做用
            if (!queue.offer(r)) {
                rejectedExecutionHandler.rejectedExecution();
            }
        }

    }

    // 同步鎖,由於判斷核心線程數和工做線程數的操做須要線程安全
    Lock lock = new ReentrantLock();

    public boolean initThread(Runnable r) {
        // 若是工做線程沒有建立滿則須要建立。
        if (!fill) {
            try {
                lock.lock();// 把鎖 加在判斷裏邊是爲了避免讓每次initThread方法執行時都加鎖
                // 此處進行雙重判斷,由於可能由於多線程緣由多個線程都判斷工做線程沒有建立滿,可是沒關係
                // 只有一個線程能夠進來,若是後續線程二次判斷已經滿了就直接返回。
                if (fill) {
                    return false;
                }
                Thread newThread = threadFactory.newThread(() -> {
                    // 由於線程是由任務觸發建立的,因此先把觸發線程建立的任務執行掉。
                    {
                        r.run();
                    }

                    while (true) {
                        // 而後該線程則不停的從隊列中獲取任務
                        try {
                            Runnable task = queue.take();
                            task.run();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                });
                newThread.start();
                // 工做線程數+1
                workCount++;
                // 若是工做線程數已經與核心線程數相等,則不可建立
                if (workCount == corePollSize) {
                    fill = true;
                }
                return true;
            } finally {
                lock.unlock();// 釋放鎖
            }
        } else {
            // 工做線程已滿則不建立
            return false;
        }

    }

}
View Code

ThreadPoolExecutor

最後考慮下,當工做線程數到達核心線程數後,隊列也滿了之後,任務就被拒絕了。能不能想個辦法,當工做線程滿後,多增長几個線程工做,當任務很少時在將擴展的線程銷燬。ThreadPoolExecutor的構造函數中新增三個參數maximumPoolSize最大線程數keepAliveTime空閒時間,unit空閒時間的單位。

和CountCorePollThreadExecutor相比較在流程上講咱們只須要在隊列滿時判斷工做線程是否和最大線程數相等,若是不相等則建立備用線程,而且在備用線程長時間不工做時須要銷燬掉工做線程。create()方法雙重判斷workCount==maximumPoolSize若是已經相等表示已經不能建立線程了,此時只能執行拒絕策略。不然建立備用線程,備用線程建立後自旋的執行poll(l,u)方法,該方法也是取出隊列頭元素,和take()不一樣的是,poll若是一段時間後仍然從隊列中拿不到元素(隊列爲空)則返回null,此時咱們須要將該備用線程銷燬。在建立線程後將workCount++。此外須要注意,由於當前隊列滿了,因此纔會建立備用線程因此不要將當前的任務給忘了,LinkedBlockingQueue的put(r)方法會阻塞的添加元素,直到添加成功。最後 stop()判讀若是workCount>corePollSize則在線程安全的環境下將線程中止,而且將workCount--。

package com.datang.bingxiang.run;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import com.datang.bingxiang.run.intr.Executor;
import com.datang.bingxiang.run.intr.RejectedExecutionHandler;
import com.datang.bingxiang.run.intr.ThreadFactory;

public class ThreadPoolExecutor implements Executor {

    // 核心線程數
    private Integer corePollSize;

    // 工做線程數,也就是線程實例的數量
    private Integer workCount = 0;

    // 當隊列滿時,須要建立新的Thread,maximumPoolSize爲最大線程數
    private Integer maximumPoolSize;

    // 當任務很少時,須要刪除多餘的線程,keepAliveTime爲空閒時間
    private long keepAliveTime;

    // unit爲空閒時間的單位
    private TimeUnit unit;

    // 線程是否已滿
    private boolean fill = false;

    // 拒絕策略,由調用者傳入,當隊列滿時,執行自定義策略
    private RejectedExecutionHandler rejectedExecutionHandler;

    // 線程工廠,由調用者傳入
    private ThreadFactory threadFactory;

    // 此次使用鏈表類型的單向隊列
    BlockingQueue<Runnable> workQueue;

    public ThreadPoolExecutor(Integer corePollSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
            RejectedExecutionHandler rejectedExecutionHandler) {
        this.corePollSize = corePollSize;
        this.rejectedExecutionHandler = rejectedExecutionHandler;
        this.threadFactory = threadFactory;
        this.workQueue = workQueue;
        this.maximumPoolSize = maximumPoolSize;
        this.keepAliveTime = keepAliveTime;
        this.unit = unit;
    }

    @Override
    public void execute(Runnable r) {
        // 若是沒有建立線程
        if (!initThread(r)) {
            // offer和ArrayBlockingQueue的offer相同的做用
            if (!workQueue.offer(r)) {
                // 隊列滿了之後先不走拒絕策略而是查詢線程數是否到達最大線程數
                if (create()) {
                    Thread newThread = threadFactory.newThread(() -> {
                        while (true) {
                            // 而後該線程則不停的從隊列中獲取任務
                            try {
                                Runnable task = workQueue.poll(keepAliveTime, unit);
                                if (task == null) {
                                    stop();
                                } else {
                                    task.run();
                                }
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    });
                    newThread.start();
                    // 工做線程數+1
                    workCount++;
                    // 增長線程後,還須要將本應該被拒絕的任務添加到隊列
                    try {
                        // 這個put()方法會在隊列滿時阻塞添加,直到添加成功
                        workQueue.put(r);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    rejectedExecutionHandler.rejectedExecution();
                }
            }
        }

    }

    Lock clock = new ReentrantLock();

    private boolean create() {
        //雙重檢查
        if (workCount == maximumPoolSize) {
            return false;
        }
        try {
            clock.lock();
            if (workCount < maximumPoolSize) {
                return true;
            } else {
                return false;
            }
        } finally {
            clock.unlock();
        }

    }

    Lock slock = new ReentrantLock();

    // 銷燬線程
    private void stop() {
        slock.lock();
        try {
            if (workCount > corePollSize) {
                System.out.println(Thread.currentThread().getName() + "線程被銷燬");
                workCount--;
                Thread.currentThread().stop();
            }
        } finally {
            slock.unlock();
        }

    }

    // 獲取當前的工做線程數
    public Integer getworkCount() {
        return workCount;
    }

    // 同步鎖,由於判斷核心線程數和工做線程數的操做須要線程安全
    Lock lock = new ReentrantLock();

    public boolean initThread(Runnable r) {
        // 若是工做線程沒有建立滿則須要建立。
        if (!fill) {
            try {
                lock.lock();// 把鎖 加在判斷裏邊是爲了避免讓每次initThread方法執行時都加鎖
                // 此處進行雙重判斷,由於可能由於多線程緣由多個線程都判斷工做線程沒有建立滿,可是沒關係
                // 只有一個線程能夠進來,若是後續線程二次判斷已經滿了就直接返回。
                if (fill) {
                    return false;
                }
                Thread newThread = threadFactory.newThread(() -> {
                    // 由於線程是由任務觸發建立的,因此先把觸發線程建立的任務執行掉。
                    {
                        r.run();
                    }

                    while (true) {
                        // 而後該線程則不停的從隊列中獲取任務
                        try {
                            Runnable task = workQueue.take();
                            task.run();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }

                    }
                });
                newThread.start();
                // 工做線程數+1
                workCount++;
                // 若是工做線程數已經與核心線程數相等,則不可建立
                if (workCount == corePollSize) {
                    fill = true;
                }
                return true;
            } finally {
                lock.unlock();// 釋放鎖
            }
        } else {
            // 工做線程已滿則不建立
            return false;
        }
    }

}
View Code

 

 

測試代碼

package com.datang.bingxiang.run.test;

import java.time.LocalDateTime;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

import com.datang.bingxiang.run.CorePollThreadExecutor;
import com.datang.bingxiang.run.CountCorePollThreadExecutor;
import com.datang.bingxiang.run.CustomRejectedExecutionHandler;
import com.datang.bingxiang.run.CustomThreadFactory;
import com.datang.bingxiang.run.NewsThreadExecutor;
import com.datang.bingxiang.run.SingThreadExecutor;
import com.datang.bingxiang.run.ThreadPoolExecutor;
import com.datang.bingxiang.run.intr.Executor;

@RestController
public class TestController {



    private int exe1Count = 1;
    Executor newsThreadExecutor = new NewsThreadExecutor();

    // 每次都建立新的線程執行
    @GetMapping(value = "exe1")
    public String exe1() {
        newsThreadExecutor.execute(() -> {
            System.out.println("正在執行" + exe1Count++);
        });
        return "success";
    }

    /*
     * 等待隊列長度爲1,三個線程加入,第一個加入後會迅速的出隊列。剩下兩個只有一個能夠成功 加入,另外一個 則會被丟棄
     */
    private int exe2Count = 1;
    Executor singThreadExecutor = new SingThreadExecutor();

    @GetMapping(value = "exe2")
    public String exe2() {
        singThreadExecutor.execute(() -> {
            System.out.println("正在執行" + exe2Count++);
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        return "success";
    }

    private int exe3Count = 1;
    Executor corePollThreadExecutor = new CorePollThreadExecutor(2);

    @GetMapping(value = "exe3")
    public String exe3() {
        corePollThreadExecutor.execute(() -> {
            System.out.println("正在執行" + exe3Count++);
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        return "success";
    }

    private int exe4Count = 1;
    Executor countCorePollThreadExecutor = new CountCorePollThreadExecutor(2, new CustomRejectedExecutionHandler(),
            new CustomThreadFactory());

    @GetMapping(value = "exe4")
    public String exe4() {
        countCorePollThreadExecutor.execute(() -> {
            System.out.println("正在執行" + exe4Count++);
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        return "success";
    }

    // 第一次建立線程並執行 1
    // 第二次進入隊列 2
    // 第三次建立線程取出隊列中的2,將3添加到隊列
    // 第四次拒絕
    // 等待3秒後只剩下一個隊列
    private int exe5Count = 1;
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 2, 3, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(1), new CustomThreadFactory(), new CustomRejectedExecutionHandler());

    @GetMapping(value = "exe5")
    public String exe5() {
        threadPoolExecutor.execute(() -> {
            System.out.println("正在執行" + exe5Count++);
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        return "success";
    }

    @GetMapping(value = "workCount")
    public Integer getWorkCount() {
        return threadPoolExecutor.getworkCount();
    }
}
View Code
相關文章
相關標籤/搜索