Java多線程之Executor框架和手寫簡易的線程池

目錄

線程池

什麼是線程池

線程池一種線程使用模式,線程池會維護多個線程,等待着分配可併發執行的任務,當有任務須要線程執行時,從線程池中分配線程給該任務而不用主動的建立線程。java

線程池的好處

若是在咱們平時若是須要用到線程時,咱們通常是這樣作的:建立線程(T1),使用建立的線程來執行任務(T2),任務執行完成後銷燬當前線程(T3),這三個階段是必需要有的。api

而若是使用線程池呢?數組

線程池會預先建立好必定數量的線程,須要的時候申請使用,在一個任務執行完後也不須要將該線程銷燬,很明顯的節省了T1和T3這兩階段的時間。安全

同時咱們的線程由線程池來統一進行管理,這樣也提升了線程的可管理性。bash

手寫一個本身的線程池

如今咱們能夠簡單的理解爲線程池實際上就是存放多個線程的數組,在程序啓動是預先實例化必定得線程實例,當有任務須要時分配出去。如今咱們先來寫一個本身的線程池來理解一下線程池基本的工做過程。服務器

線程池須要些什麼?

首先線程池確定須要必定數量的線程,因此首先須要一個線程數組,固然也能夠是一個集合。網絡

線程數組是用來進行存放線程實例的,要使用這些線程就須要有任務提交過來。當任務量過大時,咱們是不可能在同一時刻給全部的任務分配一個線程的,因此咱們還須要一個用於存聽任務的容器多線程

這裏的預先初始化線程實例的數量也須要咱們來根據業務肯定。併發

同時線程實例的數量也不能隨意的定義,因此咱們還須要設置一個最大線程數框架

//線程池中容許的最大線程數
    private static int MAXTHREDNUM = Integer.MAX_VALUE;
    //當用戶沒有指定時默認的線程數
    private  int threadNum = 6;
    //線程隊列,存放線程任務
    private List<Runnable> queue;

    private WorkerThread[] workerThreads;
複製代碼

線程池工做

線程池的線程通常須要預先進行實例化,這裏咱們經過構造函數來模擬這個過程。

public MyThreadPool(int threadNum) {
        this.threadNum = threadNum;
        if(threadNum > MAXTHREDNUM)
            threadNum = MAXTHREDNUM;
        this.queue = new LinkedList<>();
        this.workerThreads = new WorkerThread[threadNum];
        init();
    }

    //初始化線程池中的線程
 private void init(){
    for(int i=0;i<threadNum;i++){
        workerThreads[i] = new WorkerThread();
        workerThreads[i].start();
    }
  }
複製代碼

在線程池準備好了後,咱們須要像線程池中提交工做任務,任務統一提交到隊列中,當有任務時,自動分發線程。

//提交任務
    public void execute(Runnable task){
        synchronized (queue){
            queue.add(task);
            //提交任務後喚醒等待在隊列的線程
            queue.notifyAll();
        }
    }
複製代碼

咱們的工做線程爲了獲取任務,須要一直監放任務隊列,當隊列中有任務時就由一個線程去執行,這裏咱們用到了前面提到的安全中斷。

private class WorkerThread extends Thread {

    private volatile boolean on = true;
    @Override
    public void run() {
        Runnable task = null;
        //判斷是否能夠取任務
        try {
            while(on&&!isInterrupted()){
                synchronized (queue){
                    while (on && !isInterrupted() && queue.isEmpty()) {
                        //這裏若是使用阻塞隊列來獲取在執行時就不會報錯
                        //報錯是由於退出時銷燬了全部的線程資源,不影響使用
                        queue.wait(1000);
                    }
                    if (on && !isInterrupted() && !queue.isEmpty()) {
                        task = queue.remove(0);
                    }

                    if(task !=null){
                        //取到任務後執行
                        task.run();
                    }
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        task = null;//任務結束後手動置空,加速回收
    }

    public void cancel(){
        on = false;
        interrupt();
    }
}
複製代碼

固然退出時還須要對線程池中的線程等進行銷燬。

//銷燬線程池
    public void shutdown(){
        for(int i=0;i<threadNum;i++){
            workerThreads[i].cancel();
            workerThreads[i] = null;
        }
        queue.clear();
    }
複製代碼

好了,到這裏咱們的一個簡易版的線程池就完成了,功能雖然很少可是線程池運行的基本原理差很少實現了,實際上很是簡單,咱們來寫個程序測試一下:

public class ThreadPoolTest {
    public static void main(String[] args) throws InterruptedException {
        // 建立3個線程的線程池
        MyThreadPool t = new MyThreadPool(3);
        CountDownLatch countDownLatch = new CountDownLatch(5);
        t.execute(new MyTask(countDownLatch, "testA"));
        t.execute(new MyTask(countDownLatch, "testB"));
        t.execute(new MyTask(countDownLatch, "testC"));
        t.execute(new MyTask(countDownLatch, "testD"));
        t.execute(new MyTask(countDownLatch, "testE"));
        countDownLatch.await();
        Thread.sleep(500);
        t.shutdown();// 全部線程都執行完成才destory
        System.out.println("finished...");
    }

    // 任務類
    static class MyTask implements Runnable {

        private CountDownLatch countDownLatch;
        private String name;
        private Random r = new Random();

        public MyTask(CountDownLatch countDownLatch, String name) {
            this.countDownLatch = countDownLatch;
            this.name = name;
        }

        public String getName() {
            return name;
        }

        @Override
        public void run() {// 執行任務
            try {
                countDownLatch.countDown();
                Thread.sleep(r.nextInt(1000));
                System.out.println("任務 " + name + " 完成");
            } catch (InterruptedException e) {
                System.out.println(Thread.currentThread().getId()+" sleep InterruptedException:"
                        +Thread.currentThread().isInterrupted());
            }
        }
    }
}

result:
任務 testA 完成
任務 testB 完成
任務 testC 完成
任務 testD 完成
任務 testE 完成
finished...
java.lang.InterruptedException
	at java.lang.Object.wait(Native Method)
	at com.learn.threadpool.MyThreadPool$WorkerThread.run(MyThreadPool.java:75)
...
複製代碼

從結果能夠看到咱們提交的任務都被執行了,當全部任務執行完成後,咱們強制銷燬了全部線程,因此會拋出異常。

JDK中的線程池

上面咱們實現了一個簡易的線程池,稍微理解線程池的基本運做原理。如今咱們來認識一些JDK中提供了線程池吧。

ThreadPoolExecutor

public class ThreadPoolExecutor extends AbstractExecutorService
複製代碼

ThreadPoolExecutor是一個ExecutorService ,使用可能的幾個合併的線程執行每一個提交的任務,一般使用Executors工廠方法配置,經過Executors能夠配置多種適合不一樣場景的線程池。

ThreadPoolExecutor中的主要參數
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) 
複製代碼
corePoolSize

線程池中的核心線程數,當外部提交一個任務時,線程池就建立一個新線程執行任務,直到當前線程數等於corePoolSize時再也不建立新線程; 若是當前線程數爲corePoolSize,繼續提交的任務被保存到阻塞隊列中,等待被執行; 若是執行了線程池的prestartAllCoreThreads()方法,線程池會提早建立並啓動全部核心線程。

maximumPoolSize

線程池中容許的最大線程數。若是當前阻塞隊列已滿,還在繼續提交任務,則建立新的線程執行任務,前提是當前線程數小於maximumPoolSize。

keepAliveTime

線程空閒時的存活時間,即當線程沒有任務執行時,繼續存活的時間。默認狀況下,線程通常不會被銷燬,該參數只在線程數大於corePoolSize時纔有用。

workQueue

workQueue必須是阻塞隊列。當線程池中的線程數超過corePoolSize的時候,線程會進入阻塞隊列進行等待。阻塞隊列可使有界的也能夠是無界的。

threadFactory

建立線程的工廠,經過自定義的線程工廠能夠給每一個新建的線程設置一個線程名。Executors靜態工廠裏默認的threadFactory,線程的命名規則是「pool-{數字}-thread-{數字}」。

RejectedExecutionHandler

線程池的飽和處理策略,當阻塞隊列滿了,且沒有空閒的工做線程,若是繼續提交任務,必須採起一種策略處理該任務,線程池提供了4種策略:

  • AbortPolicy:直接拋出異常,默認的處理策略
  • CallerRunsPolicy:使用調用者所屬的線程來執行當前任務
  • DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務,並執行當前任務
  • DiscardPolicy:直接丟棄該任務 若是上述提供的處理策略沒法知足業務需求,也能夠根據場景實現RejectedExecutionHandler接口,自定義飽和策略,如記錄日誌或持久化存儲不能處理的任務。
ThreadPoolExecutor中的主要執行流程

線程池

//圖片來自網絡

  1. 線程池判斷核心線程池裏的線程(corePoolSize)是否都在執行任務。若是不是,則建立一個新的工做線程來執行任務。若是核心線程池裏的線程都在執行任務,則進入2。
  2. 線程池判斷工做隊列(workQueue)是否已滿。若是工做隊列沒有滿,則將新提交的任務存儲在該隊列裏。若是工做隊列滿了,則進入3。
  3. 線程池判斷線程池的線程(maximumPoolSize)是否都處於工做狀態。若是沒有,則建立一個新的工做線程來執行任務。若是已經滿了,則交給飽和策略來處理這個任務。

這裏須要注意的是核心線程池大小指得是corePoolSize參數,而線程池工做線程數指的是maximumPoolSize。

Executor

實際上咱們在使用線程池時,並不必定須要本身來定義上面介紹的參數的值,JDK爲咱們提供了一個調度框架。經過這個調度框架咱們能夠輕鬆的建立好線程池以及異步的獲取任務的執行結果。

調度框架的組成

任務

通常是指須要被執行的任務,多爲使用者提供。被提交的任務須要實現Runnable接口或Callable接口。

任務的執行

Executor是任務執行機制的核心接口,其將任務的提交和執行分離開來。ExecutorService繼承了Executor並作了一些擴展,能夠產生Future爲跟蹤一個或多個異步任務執行。任務的執行主要是經過實現了Executor和ExecutorService接口的類來進行實現。例如:ThreadPoolExecutor和ScheduledThreadPoolExecutor。

結果獲取

對結果的獲取能夠經過Future接口以及其子類接口來實現。Future接口提供了一系列諸如檢查是否就緒,是否執行完成,阻塞以及獲取結果等方法。

Executors工廠中的線程池

FixedThreadPool
new ThreadPoolExecutor(nThreads, nThreads, 0L, 
                        TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
複製代碼

該線程池中corePoolSize和maximumPoolSize參數一致。同時使用無界阻塞隊列,將會致使maximumPoolSize和keepAliveTime已經飽和策略無效,由於隊列會一直接收任務,直到OOM。

SingleThreadExecutor
new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>())
複製代碼

該線程池中corePoolSize和maximumPoolSize都爲1,表示始終只有一個線程在工做,適用於須要保證順序地執行各個任務;而且在任意時間點,不會有多個線程是活動的應用場景。同時使用無界阻塞隊列,當任務多時極有可能OOM。

CachedThreadPool
new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>()
複製代碼

CachedThreadPool類型的線程池corePoolSize爲0,表示任務將會提交給隊列,可是SynchronousQueue又是一個不包含任何容量的隊列。因此每個任務提交過來都會建立一個新的線程來執行,該類型的線程池適用於執行不少的短時間異步任務的程序,或者是負載較輕的服務器。若是當任務的提交速度一旦超過任務的執行速度,在極端狀況下可能會由於建立過多線程而耗盡CPU和內存資源。

ScheduledThreadPool

對於定時任務類型的線程池,Executor能夠建立兩種不一樣類型的線程池:ScheduledThreadPoolExecutor和SingleThreadScheduledExecutor,前者是包含若干個線程的ScheduledThreadPoolExecutor,後者是隻包含一個的ScheduledThreadPoolExecutor。

ScheduledThreadPoolExecutor適用於須要多個後臺線程執行週期任務,同時爲了知足資源管理的需求而須要限制後臺線程的數量的應用場景。

SingleThreadScheduledExecutor適用於須要單個後臺線程執行週期任務,同時須要保證順序地執行各個任務的應用場景。

new ThreadPoolExecutor(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());
複製代碼

在對該類型線程池進行實例化時,咱們能夠看到maximumPoolSize設置爲了Integer的最大值,因此很明顯在極端狀況下和CachedThreadPool類型同樣可能會由於建立過多線程而耗盡CPU和內存資源。

DelayedWorkQueue是一種延時阻塞隊列,此隊列的特色爲其中元素只能在其延遲到期時才被使用。ScheduledThreadPool類型在執行任務時和其餘線程池有些不一樣。

  1. ScheduledThreadPool類型線程池中的線程(假設如今線程A開始取任務)從DelayedWorkQueue中取已經到期的任務。
  2. 線程A獲取到任務後開始執行。
  3. 任務執行完成後設置該任務下一次執行的時間。
  4. 將該任務從新放入到線程池中。

ScheduledThreadPool中存在着定時任務和延時任務兩種。

延時任務經過schedule(...)方法以及重載方法和scheduleWithFixedDelay實現,延時任務經過設置某個時間間隔後執行,schedule(...)僅執行一次。

定時任務由scheduleAtFixedRate實現。該方法建立並執行在給定的初始延遲以後,隨後以給定的時間段進行週期性動做,即固定時間間隔的任務。

特殊的scheduleWithFixedDelay方法是建立並執行在給定的初始延遲以後首先啓用的按期動做,隨後在一個執行的終止和下一個執行的開始之間給定的延遲,即固定延時間隔的任務。

固定時間間隔的任務不論每次任務花費多少時間,下次任務開始執行時間是肯定的。對於scheduleAtFixedRate方法中,若任務處理時長超出設置的定時頻率時長,本次任務執行完纔開始下次任務,下次任務已經處於超時狀態,會立刻開始執行。若任務處理時長小於定時頻率時長,任務執行完後,定時器等待,下次任務會在定時器等待頻率時長後執行。

固定延時間隔的任務是指每次執行完任務之後都等待一個固定的時間。因爲操做系統調度以及每次任務執行的語句可能不一樣,因此每次任務執行所花費的時間是不肯定的,也就致使了每次任務的執行週期存在必定的波動。

須要注意的是定時或延時任務中所涉及到時間、週期不能保證明時性及準確性,實際運行中會有必定的偏差。

Callable/Future

在介紹實現多線程的時候咱們有簡單介紹過Runnable和Callable的,這二者基本相同,不一樣在於Callable能夠返回一個結果,而Runnable不返回結果。對於Callable接口的使用方法和Runnable基本相同,同時咱們也能夠選擇是否對結果進行接收處理。在Executors中提供了將Runnable轉換爲Callable的api:Callable<Object> callable(Runnable task)

Future是一個用於接收Runnable和Callable計算結果的接口,固然它還提供了查詢任務狀態,中斷或者阻塞任務以及查詢結果的能力。

boolean cancel(boolean mayInterruptIfRunning)  //嘗試取消執行此任務。  
V get()  //等待計算完成,而後檢索其結果。  
V get(long timeout, TimeUnit unit) //等待最多在給定的時間,而後檢索其結果(若是可用)。  
boolean isCancelled() //若是此任務在正常完成以前被取消,則返回 true 。  
boolean isDone() //若是任務已完成返回true複製代碼

FutureTask是對Future的基本實現,具備啓動和取消計算的方法,查詢計算是否完整,並檢索計算結果。FutureTask對Future作了必定得擴展:

void run() //將此future設置爲其計算結果,除非已被取消。  
protected boolean runAndReset()  //執行計算而不設置其結果,而後重置爲初始狀態,若是計算遇到異常或被取消,則不執行此操做。  
protected void set(V v) //將此Future的結果設置爲給定值,除非Future已被設置或已被取消。  
protected void setException(Throwable t) //除非已經設置了此 Future 或已將其取消,不然它將報告一個 ExecutionException,並將給定的 throwable 做爲其緣由。  
複製代碼

FutureTask除了實現Future接口外,還實現了Runnable接口。因此FutureTask能夠由Executor執行,也能夠由調用線程直接執行futureTask.run()。

當FutureTask處於未啓動或已啓動狀態時,執行FutureTask.get()方法將致使調用線程阻塞;

當FutureTask處於已完成狀態時,執行FutureTask.get()方法將致使調用線程當即返回結果或拋出異常。

當FutureTask處於未啓動狀態時,執行FutureTask.cancel()方法將致使此任務永遠不會被執行;

當FutureTask處於已啓動狀態時,執行FutureTask.cancel(true)方法將以中斷執行此任務線程的方式來嘗試中止該任務;

當FutureTask處於已啓動狀態時,執行FutureTask.cancel(false)方法將不會對正在執行此任務的線程產生影響(讓正在執行的任務運行完成)。

關因而否使用Executors

在以前阿里巴巴出的java開發手冊中,有明確提出禁止使用Executors:

【強制】線程池不容許使用 Executors 去建立,而是經過 ThreadPoolExecutor 的方式, 這樣的處理方式讓寫的同窗更加明確線程池的運行規則,規避資源耗盡的風險。

在上面咱們分析過使用Executors建立的幾種線程池的使用場景和缺點,大多數狀況下出問題在於可能致使OOM,在我實際使用中基本沒有遇到過這樣的狀況。可是考慮到阿里巴巴這樣體量的併發請求,可能遇到這種狀況的概率較大。因此咱們仍是應該根據實際狀況考慮是否使用,固然實際遵循阿里巴巴開發手冊來可能會更好一點,畢竟這是國類頂尖公司常年在生產中積累下的經驗。

最後,在本節中只是簡單介紹線程池及其基本原理,幫助更好的理解線程池。並不涉及具體如何使用。

相關文章
相關標籤/搜索