Java併發編程系列-(6) Java線程池

6. 線程池

6.1 基本概念

在web開發中,服務器須要接受並處理請求,因此會爲一個請求來分配一個線程來進行處理。若是每次請求都新建立一個線程的話實現起來很是簡便,可是存在一個問題:若是併發的請求數量很是多,但每一個線程執行的時間很短,這樣就會頻繁的建立和銷燬線程,如此一來會大大下降系統的效率。可能出現服務器在爲每一個請求建立新線程和銷燬線程上花費的時間和消耗的系統資源要比處理實際的用戶請求的時間和資源更多。git

那麼有沒有一種辦法使執行完一個任務,並不被銷燬,而是能夠繼續執行其餘的任務呢?這就是線程池的目的了。線程池爲線程生命週期的開銷和資源不足問題提供瞭解決方案。經過對多個任務重用線程,線程建立的開銷被分攤到了多個任務上。github

何時使用線程池?

  • 單個任務處理時間比較短
  • 須要處理的任務數量很大

使用線程池好處

  • 下降資源消耗。經過重複利用已建立的線程下降線程建立和銷燬形成的消耗。
  • 提升響應速度。當任務到達時,任務能夠不須要的等到線程建立就能當即執行。
  • 提升線程的可管理性。線程是稀缺資源,若是無限制的建立,不只會消耗系統資源,還會下降系統的穩定性,使用線程池能夠進行統一的分配,調優和監控。

6.2 實現本身的線程池

實現的線程池須要知足如下基本條件:web

一、線程必須在池子已經建立好了,而且能夠保持住,要有容器保存多個線程;
二、線程還要可以接受外部的任務,運行這個任務。容器保持這個來不及運行的任務.面試

如下是線程池的具體實現:算法

線程池中實現了任務隊列,用來保存全部的任務;工做線程,來執行具體的任務。數據庫

public class MyThreadPool2 {
    // 線程池中默認線程的個數爲5
    private static int WORK_NUM = 5;
    // 隊列默認任務個數爲100
    private static int TASK_COUNT = 100;  
    
    // 用戶在構造這個池,但願的啓動的線程數
    private final int worker_num;
    // 工做線程組
    private WorkThread[] workThreads;
    // 任務隊列,做爲一個緩衝
    private final BlockingQueue<Runnable> taskQueue;

    // 建立具備默認線程個數的線程池
    public MyThreadPool2() {
        this(WORK_NUM,TASK_COUNT);
    }

    // 建立線程池,worker_num爲線程池中工做線程的個數
    public MyThreadPool2(int worker_num,int taskCount) {
        if (worker_num<=0) worker_num = WORK_NUM;
        if(taskCount<=0) taskCount = TASK_COUNT;
        this.worker_num = worker_num;
        taskQueue = new ArrayBlockingQueue<>(taskCount);
        workThreads = new WorkThread[worker_num];
        for(int i=0;i<worker_num;i++) {
            workThreads[i] = new WorkThread();
            workThreads[i].start();
        }
    }

    // 執行任務,其實只是把任務加入任務隊列,何時執行有線程池管理器決定
    public void execute(Runnable task) {
        try {
            taskQueue.put(task);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    // 銷燬線程池,該方法保證在全部任務都完成的狀況下才銷燬全部線程,不然等待任務完成才銷燬
    public void destroy() {
        // 工做線程中止工做,且置爲null
        System.out.println("ready close pool.....");
        for(int i=0;i<worker_num;i++) {
            workThreads[i].stopWorker();
            workThreads[i] = null;//help gc
        }
        taskQueue.clear();// 清空任務隊列
    }

    // 覆蓋toString方法,返回線程池信息:工做線程個數和已完成任務個數
    @Override
    public String toString() {
        return "WorkThread number:" + worker_num
                + "  wait task number:" + taskQueue.size();
    }

    /**
     * 內部類,工做線程
     */
    private class WorkThread extends Thread{
        
        @Override
        public void run(){
            Runnable r = null;
            try {
                while (!isInterrupted()) {
                    r = taskQueue.take();
                    if(r!=null) {
                        System.out.println(getId()+" ready exec :"+r);
                        r.run();
                    }
                    r = null;//help gc;
                } 
            } catch (Exception e) {
                // TODO: handle exception
            }
        }
        
        public void stopWorker() {
            interrupt();
        }
        
    }
}

如下是測試程序:後端

分別建立多個任務,並放入線程池進行執行。緩存

public class TestMyThreadPool {
    public static void main(String[] args) throws InterruptedException {
        // 建立3個線程的線程池
        MyThreadPool2 t = new MyThreadPool2(3,0);
        t.execute(new MyTask("testA"));
        t.execute(new MyTask("testB"));
        t.execute(new MyTask("testC"));
        t.execute(new MyTask("testD"));
        t.execute(new MyTask("testE"));
        t.execute(new MyTask("testF"));
        t.execute(new MyTask("testG"));
        t.execute(new MyTask("testH"));
        System.out.println(t);
        Thread.sleep(10000);
        t.destroy();// 全部線程都執行完成才destory
        System.out.println(t);
    }

    // 任務類
    static class MyTask implements Runnable {

        private String name;
        private Random r = new Random();

        public MyTask(String name) {
            this.name = name;
        }

        public String getName() {
            return name;
        }

        @Override
        public void run() {// 執行任務
            try {
                Thread.sleep(r.nextInt(1000)+2000);
            } catch (InterruptedException e) {
                System.out.println(Thread.currentThread().getId()+" sleep InterruptedException:"
                        +Thread.currentThread().isInterrupted());
            }
            System.out.println("任務 " + name + " 完成");
        }
    }
}

6.3 Executor框架

Executor框架是一個根據一組執行策略調用,調度,執行和控制的異步任務的框架,目的是提供一種將」任務提交」與」任務如何運行」分離開來的機制。服務器

Executor框架的類繼承關係以下圖:網絡

Screen Shot 2019-12-11 at 10.21.49 PM.png

J.U.C中有三個Executor接口:

  • Executor:一個運行新任務的簡單接口;
  • ExecutorService:擴展了Executor接口。添加了一些用來管理執行器生命週期和任務生命週期的方法;
  • ScheduledExecutorService:擴展了ExecutorService。支持Future和按期執行任務。

下面分別進行介紹:

1. Executor接口

Executor接口只有一個execute方法,用來替代一般建立或啓動線程的方法。

public interface Executor {
    void execute(Runnable command);
}

Executor接口只有一個execute方法,用來替代一般建立或啓動線程的方法。

executor.execute(new Thread())

對於不一樣的Executor實現,execute()方法多是建立一個新線程並當即啓動,也有多是使用已有的工做線程來運行傳入的任務,也多是根據設置線程池的容量或者阻塞隊列的容量來決定是否要將傳入的線程放入阻塞隊列中或者拒絕接收傳入的線程。

2. ExecutorService接口

ExecutorService接口繼承自Executor接口,提供了管理終止的方法,以及可爲跟蹤一個或多個異步任務執行情況而生成 Future 的方法。增長了shutDown(),shutDownNow(),invokeAll(),invokeAny()和submit()等方法。若是須要支持即時關閉,也就是shutDownNow()方法,則任務須要正確處理中斷。

3. ScheduledExecutorService接口

ScheduledExecutorService擴展ExecutorService接口並增長了schedule方法。調用schedule方法能夠在指定的延時後執行一個Runnable或者Callable任務。ScheduledExecutorService接口還定義了按照指定時間間隔按期執行任務的scheduleAtFixedRate()方法和scheduleWithFixedDelay()方法。

4. Executor框架基本使用流程

基本使用流程以下:

Picture1.png

6.4 ThreadPoolExecutor分析

ThreadPoolExecutor繼承自AbstractExecutorService,也實現了ExecutorService接口。JDK中的提供的內置線程池基本都基於ThreadPoolExecutor實現,後面會仔細介紹。

構造函數及參數意義

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

構造方法中的字段含義以下:

  • corePoolSize:線程池中核心線程數,運行的線程數<corePoolSize,就會建立新線程,>= corePoolSize,這個任務就會保存到BlockingQueue,若是調用prestartAllCoreThreads()方法就會一次性的啓動corePoolSize個數的線程。
  • maximumPoolSize: 容許的最大線程數,BlockingQueue也滿了,< maximumPoolSize時候就會再次建立新的線程.
  • keepAliveTime: 線程空閒下來後,存活的時間,這個參數只在 >corePoolSize 纔有用.
  • TimeUnit unit: 存活時間的單位值.
  • workQueue:保存等待執行的任務的阻塞隊列,當提交一個新的任務到線程池之後, 線程池會根據當前線程池中正在運行着的線程的數量來決定對該任務的處理方式,主要有如下幾種處理方式:

    1. 使用直接切換隊列:這種方式經常使用的隊列是SynchronousQueue.
    2. 使用無界隊列:通常使用基於鏈表的阻塞隊列LinkedBlockingQueue。若是使用這種方式,那麼線程池中可以建立的最大線程數就是corePoolSize,而maximumPoolSize就不會起做用了(後面也會說到)。當線程池中全部的核心線程都是RUNNING狀態時,這時一個新的任務提交就會放入等待隊列中。
    3. 使用有界隊列:通常使用ArrayBlockingQueue。使用該方式能夠將線程池的最大線程數量限制爲maximumPoolSize,這樣可以下降資源的消耗,但同時這種方式也使得線程池對線程的調度變得更困難,由於線程池和隊列的容量都是有限的值,因此要想使線程池處理任務的吞吐率達到一個相對合理的範圍,又想使線程調度相對簡單,而且還要儘量的下降線程池對資源的消耗,就須要合理的設置這兩個數量。
  • threadFactory:它是ThreadFactory類型的變量,用來建立新線程。默認使用Executors.defaultThreadFactory() 來建立線程。使用默認的ThreadFactory來建立線程時,會使新建立的線程具備相同的NORM_PRIORITY優先級而且是非守護線程,同時也設置了線程的名稱。

  • handler:它是RejectedExecutionHandler類型的變量,表示線程池的飽和策略。若是阻塞隊列滿了而且沒有空閒的線程,這時若是繼續提交任務,就須要採起一種策略處理該任務。

線程池提供了4種策略:

  1. AbortPolicy:直接拋出異常,這是默認策略;
  2. CallerRunsPolicy:用調用者所在的線程來執行任務;
  3. DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務,並執行當前任務;
  4. DiscardPolicy:直接丟棄任務;

任務執行

提交任務執行,主要有execute和submit兩種方式,主要區別是後者須要有返回值。

  • execute(Runnable command)
  • Future submit(Callable task)

下面主要介紹execute的流程:

簡單來講,在執行execute()方法時且狀態一直是RUNNING時,的執行過程以下:

  1. 若是workerCount < corePoolSize,則建立並啓動一個線程來執行新提交的任務;
  2. 若是workerCount >= corePoolSize,且線程池內的阻塞隊列未滿,則將任務添加到該阻塞隊列中;
  3. 若是workerCount >= corePoolSize && workerCount < maximumPoolSize,且線程池內的阻塞隊列已滿,則建立並啓動一個線程來執行新提交的任務;
  4. 若是workerCount >= maximumPoolSize,而且線程池內的阻塞隊列已滿, 則根據拒絕策略來處理該任務, 默認的處理方式是直接拋異常。

整個流程能夠用下圖來總結:

Picture1.png

接下來結合代碼進行分析:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * clt記錄着runState和workerCount
     */
    int c = ctl.get();
    /*
     * workerCountOf方法取出低29位的值,表示當前活動的線程數;
     * 若是當前活動線程數小於corePoolSize,則新建一個線程放入線程池中;
     * 並把任務添加到該線程中。
     */
    if (workerCountOf(c) < corePoolSize) {
        /*
         * addWorker中的第二個參數表示限制添加線程的數量是根據corePoolSize來判斷仍是maximumPoolSize來判斷;
         * 若是爲true,根據corePoolSize來判斷;
         * 若是爲false,則根據maximumPoolSize來判斷
         */
        if (addWorker(command, true))
            return;
        /*
         * 若是添加失敗,則從新獲取ctl值
         */
        c = ctl.get();
    }
    /*
     * 若是當前線程池是運行狀態而且任務添加到隊列成功
     */
    if (isRunning(c) && workQueue.offer(command)) {
        // 從新獲取ctl值
        int recheck = ctl.get();
        // 再次判斷線程池的運行狀態,若是不是運行狀態,因爲以前已經把command添加到workQueue中了,
        // 這時須要移除該command
        // 執行事後經過handler使用拒絕策略對該任務進行處理,整個方法返回
        if (! isRunning(recheck) && remove(command))
            reject(command);
        /*
         * 獲取線程池中的有效線程數,若是數量是0,則執行addWorker方法
         * 這裏傳入的參數表示:
         * 1. 第一個參數爲null,表示在線程池中建立一個線程,但不去啓動;
         * 2. 第二個參數爲false,將線程池的有限線程數量的上限設置爲maximumPoolSize,添加線程時根據maximumPoolSize來判斷;
         * 若是判斷workerCount大於0,則直接返回,在workQueue中新增的command會在未來的某個時刻被執行。
         */
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    /*
     * 若是執行到這裏,有兩種狀況:
     * 1. 線程池已經不是RUNNING狀態;
     * 2. 線程池是RUNNING狀態,但workerCount >= corePoolSize而且workQueue已滿。
     * 這時,再次調用addWorker方法,但第二個參數傳入爲false,將線程池的有限線程數量的上限設置爲maximumPoolSize;
     * 若是失敗則拒絕該任務
     */
    else if (!addWorker(command, false))
        reject(command);
}

addWorker方法的主要工做是在線程池中建立一個新的線程並執行,firstTask參數 用於指定新增的線程執行的第一個任務,core參數爲true表示在新增線程時會判斷當前活動線程數是否少於corePoolSize,false表示新增線程前須要判斷當前活動線程數是否少於maximumPoolSize,代碼以下:

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        // 獲取運行狀態
        int rs = runStateOf(c);
        /*
         * 這個if判斷
         * 若是rs >= SHUTDOWN,則表示此時再也不接收新任務;
         * 接着判斷如下3個條件,只要有1個不知足,則返回false:
         * 1. rs == SHUTDOWN,這時表示關閉狀態,再也不接受新提交的任務,但卻能夠繼續處理阻塞隊列中已保存的任務
         * 2. firsTask爲空
         * 3. 阻塞隊列不爲空
         * 
         * 首先考慮rs == SHUTDOWN的狀況
         * 這種狀況下不會接受新提交的任務,因此在firstTask不爲空的時候會返回false;
         * 而後,若是firstTask爲空,而且workQueue也爲空,則返回false,
         * 由於隊列中已經沒有任務了,不須要再添加線程了
         */
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        for (;;) {
            // 獲取線程數
            int wc = workerCountOf(c);
            // 若是wc超過CAPACITY,也就是ctl的低29位的最大值(二進制是29個1),返回false;
            // 這裏的core是addWorker方法的第二個參數,若是爲true表示根據corePoolSize來比較,
            // 若是爲false則根據maximumPoolSize來比較。
            // 
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 嘗試增長workerCount,若是成功,則跳出第一個for循環
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // 若是增長workerCount失敗,則從新獲取ctl的值
            c = ctl.get();  // Re-read ctl
            // 若是當前的運行狀態不等於rs,說明狀態已被改變,返回第一個for循環繼續執行
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 根據firstTask來建立Worker對象
        w = new Worker(firstTask);
        // 每個Worker對象都會建立一個線程
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
                // rs < SHUTDOWN表示是RUNNING狀態;
                // 若是rs是RUNNING狀態或者rs是SHUTDOWN狀態而且firstTask爲null,向線程池中添加線程。
                // 由於在SHUTDOWN時不會在添加新的任務,但仍是會執行workQueue中的任務
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // workers是一個HashSet
                    workers.add(w);
                    int s = workers.size();
                    // largestPoolSize記錄着線程池中出現過的最大線程數量
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 啓動線程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

關閉線程池

關閉線程池一般有以下兩種方式:

  • shutdownNow():設置線程池的狀態,還會嘗試中止正在運行或者暫停任務的線程
  • shutdown():設置線程池的狀態,只會中斷全部沒有執行任務的線程

線程池的參數配置

一般來說,根據任務的性質來分,能夠劃分爲:計算密集型(CPU),IO密集型,混合型。

  • 計算密集型:加密,大數分解,正則等,線程數適當小一點,最大推薦:機器的Cpu核心數+1,爲何+1,防止頁缺失,(機器的Cpu核心=Runtime.getRuntime().availableProcessors();)
  • IO密集型:讀取文件,數據庫鏈接,網絡通信, 線程數適當大一點,能夠設置爲機器的Cpu核心數*2。
  • 混合型:儘可能拆分,IO密集型>>計算密集型,拆分意義不大,IO密集型~=計算密集型
    隊列的選擇上,應該使用有界,無界隊列可能會致使內存溢出,發生OOM。

線程池的狀態

線程池的運行狀態. 線程池一共有五種狀態, 分別是:

  1. RUNNING :能接受新提交的任務,而且也能處理阻塞隊列中的任務;
  2. SHUTDOWN:關閉狀態,再也不接受新提交的任務,但卻能夠繼續處理阻塞隊列中已保存的任務。在線程池處於 RUNNING 狀態時,調用 shutdown()方法會使線程池進入到該狀態。(finalize() 方法在執行過程當中也會調用shutdown()方法進入該狀態);
  3. STOP:不能接受新任務,也不處理隊列中的任務,會中斷正在處理任務的線程。在線程池處於 RUNNING 或 SHUTDOWN 狀態時,調用 shutdownNow() 方法會使線程池進入到該狀態;
  4. TIDYING:若是全部的任務都已終止了,workerCount (有效線程數) 爲0,線程池進入該狀態後會調用 terminated() 方法進入TERMINATED 狀態。
  5. TERMINATED:在terminated() 方法執行完後進入該狀態,默認terminated()方法中什麼也沒有作。
    進入TERMINATED的條件以下:
    • 線程池不是RUNNING狀態;
    • 線程池狀態不是TIDYING狀態或TERMINATED狀態;
    • 若是線程池狀態是SHUTDOWN而且workerQueue爲空;
    • workerCount爲0;
    • 設置TIDYING狀態成功。

下圖是線程池的狀態轉換過程,

Screen Shot 2019-12-12 at 4.39.35 PM.png

6.5 Executors內置線程池

一般開發者都是利用 Executors 提供的通用線程池建立方法,去建立不一樣配置的線程池,主要區別在於不一樣的 ExecutorService 類型或者不一樣的初始參數。
Executors 目前提供了 5 種不一樣的線程池建立配置:

  • newCachedThreadPool(),它是一種用來處理大量短期工做任務的線程池,具備幾個鮮明特色:它會試圖緩存線程並重用,當無緩存線程可用時,就會建立新的工做線程;若是線程閒置的時間超過60秒,則被終止並移出緩存;長時間閒置時,這種線程池,不會消耗什麼資源。其內部使用 SynchronousQueue 做爲工做隊列。
/**
     * Creates a thread pool that creates new threads as needed, but
     * will reuse previously constructed threads when they are
     * available.  These pools will typically improve the performance
     * of programs that execute many short-lived asynchronous tasks.
     * Calls to {@code execute} will reuse previously constructed
     * threads if available. If no existing thread is available, a new
     * thread will be created and added to the pool. Threads that have
     * not been used for sixty seconds are terminated and removed from
     * the cache. Thus, a pool that remains idle for long enough will
     * not consume any resources. Note that pools with similar
     * properties but different details (for example, timeout parameters)
     * may be created using {@link ThreadPoolExecutor} constructors.
     *
     * @return the newly created thread pool
     */
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
  • newFixedThreadPool(int nThreads),建立固定數目(nThreads)的線程,其背後使用的是無界的工做隊列,任什麼時候候最多有 nThreads 個工做線程是活動的。這意味着,若是任務數量超過了活動隊列數目,將在工做隊列中等待空閒線程出現;若是有工做線程退出,將會有新的工做線程被建立,以補足指定的數目nThreads。
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
  • newSingleThreadExecutor(),它的特色在於工做線程數目被限制爲1,操做一個無界的工做隊列,因此它保證了全部任務的都是被順序執行,最多會有一個任務處於活動狀態,而且不容許使用者改動線程池實例,所以能夠避免其改變線程數目。
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }
  • newWorkStealingPool(int parallelism),這是一個常常被人忽略的線程池,Java 8 才加入這個建立方法,其內部會構建ForkJoinPool,利用Work-Stealing算法,並行地處理任務,不保證處理順序。
public ForkJoinPool() {
        this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
             defaultForkJoinWorkerThreadFactory, null, false,
             0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
    }
  • newSingleThreadScheduledExecutor() 和 newScheduledThreadPool(int corePoolSize),建立的是個 ScheduledExecutorService,能夠進行定時或週期性的工做調度,區別在於單一工做線程仍是多個工做線程。
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

如下是ScheduledThreadPoolExecutor的構造函數,該類繼承於ThreadPoolExecutor,能夠看到任務存放在DelayedWorkQueue。

public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE,
              DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
              new DelayedWorkQueue());
    }

類中提供了多種執行定時任務的方法,

public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);

總結下來,主要分三種:

  • schedule:只執行一次,任務還能夠延時執行
  • scheduleAtFixedRate:提交固定時間間隔的任務
  • scheduleWithFixedDelay:提交固定延時間隔執行的任務

注意scheduleAtFixedRate和scheduleWithFixedDelay的區別,下圖給出了二者執行任務時間上的示意圖。scheduleAtFixedRate老是間隔固定的時間來執行task,可是若是下圖中Task1執行超時,也就是超過了Fixed Time,當Task1執行完以後,Task2將馬上執行。scheduleWithFixedDelay不一樣的是,每一個任務老是在上一個任務結束以後,等待固定的Fixed Delay Time後開始執行。

Screen Shot 2019-12-12 at 2.33.08 PM.png

public class ScheduleWorkerTime implements Runnable{
    public final static int Long_8 = 8;//任務耗時8秒
    public final static int Short_2 = 2;//任務耗時2秒
    public final static int Normal_5 = 5;//任務耗時5秒

    public static SimpleDateFormat formater = new SimpleDateFormat(
            "HH:mm:ss");
    public static AtomicInteger count = new AtomicInteger(0);
    
    @Override
    public void run() {
        if(count.get()==0) {
            System.out.println("Long_8....begin:"+formater.format(new Date()));
            SleepTools.second(Long_8);
            System.out.println("Long_8....end:"+formater.format(new Date())); 
            count.incrementAndGet();
        }else if(count.get()==1) {
            System.out.println("Short_2 ...begin:"+formater.format(new Date()));
            SleepTools.second(Short_2);
            System.out.println("Short_2 ...end:"+formater.format(new Date()));
            count.incrementAndGet();            
        }else {
            System.out.println("Normal_5...begin:"+formater.format(new Date()));
            SleepTools.second(Normal_5);
            System.out.println("Normal_5...end:"+formater.format(new Date()));
            count.incrementAndGet(); 
        }
    }
    
    public static void main(String[] args) {
            ScheduledThreadPoolExecutor schedule = new ScheduledThreadPoolExecutor(1);
            //任務間隔6秒
            schedule.scheduleAtFixedRate(new ScheduleWorkerTime(),
                    0, 6000, TimeUnit.MILLISECONDS);
    }
}

代碼中定義了3個任務,分別執行8s,2s,5s,設置的固定間隔爲6s。從輸出結果能夠看到,第一個場任務結束後,第二個任務馬上開始執行,第二個任務執行完時,到了10s,此時等待2s後,第三個任務開始執行。由此能夠看到,當前序任務沒超時,後續任務會按照指定的時間進行執行;若是有超時,則會立刻執行。

執行結果以下:
Long_8....begin:14:56:27
Long_8....end:14:56:35
Short_2 ...begin:14:56:35
Short_2 ...end:14:56:37
Normal_5...begin:14:56:39
Normal_5...end:14:56:44

注意最好在提交給ScheduledThreadPoolExecutor的任務要catch異常,不然發生異常以後,程序會終止運行。

6.6 CompletionService

使用場景

當向Executor提交多個任務而且但願得到它們在完成以後的結果,若是用FutureTask,能夠循環獲取task,並調用get方法去獲取task執行結果,可是若是task還未完成,獲取結果的線程將阻塞直到task完成,因爲不知道哪一個task優先執行完畢,使用這種方式效率不會很高。

在jdk5時候提出接口CompletionService,它整合了Executor和BlockingQueue的功能,能夠更加方便在多個任務執行時,按任務完成順序獲取結果。

使用流程

CompletionService的使用流程以下:

  1. 聲明task執行載體,線程池executor;

  2. 聲明CompletionService,來包裝執行task的線程池,存放已完成狀態task的阻塞隊列,隊列默認爲基於鏈表結構的阻塞隊列LinkedBlockingQueue;

  3. 調用submit方法提交task;

  4. 調用take方法獲取已完成狀態task。

public class CompletionServiceTest {
    
    // 聲明線程池
    private static ExecutorService executorService = Executors.newFixedThreadPool(100);
    
    public void test() {
        
        // 聲明CompletionService包裝Executor
        CompletionService<Long>  completionService = new ExecutorCompletionService<Long>(executorService);
        
        final int groupNum = 10000000 / 100;
        
        for ( int i = 1; i <= 100; i++) {
            int start = (i-1) * groupNum + 1;
            int end = i * groupNum;
            
            completionService.submit(new Callable<Long>() {
                
                @Override
                public Long call() throws Exception {
                    Long sum = 0L;
                    
                    for (int j = start; j <= end; j++) {
                        sum += j;
                    }
                    return sum;
                }
            });
        }
        
        long result = 0L;
        try {
            for (int i = 0; i < 100; i++) {
                long taskResult = completionService.take().get();
                System.out.println(taskResult);
                result += taskResult;
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        
        System.out.println("the result is " + result);
    }
    
    public static void main(String[] args) {
        new CompletionServiceTest().test();
    }
}

源碼分析

CompletionService接口提供五個方法:

  • Future submit(Callable task)
    提交Callable類型的task;

  • Future submit(Runnable task, V result)
    提交Runnable類型的task;

  • Future take() throws InterruptedException
    獲取並移除已完成狀態的task,若是目前不存在這樣的task,則等待;

  • Future poll()
    獲取並移除已完成狀態的task,若是目前不存在這樣的task,返回null;

  • Future poll(long timeout, TimeUnit unit) throws InterruptedException
    獲取並移除已完成狀態的task,若是在指定等待時間內不存在這樣的task,返回null。

CompletionService與普通用FutureTask獲取結果的最大不一樣是,能夠按照任務完成的順序返回結果。具體是如何實現的呢?

內部封裝了一個QueueingFuture對象,而且實現了done方法,在task執行完成以後將當前task添加到completionQueue。

private static class QueueingFuture<V> extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task,
                       BlockingQueue<Future<V>> completionQueue) {
            super(task, null);
            this.task = task;
            this.completionQueue = completionQueue;
        }
        private final Future<V> task;
        private final BlockingQueue<Future<V>> completionQueue;
        protected void done() { completionQueue.add(task); }
    }

done方法將在FutureTask的finishCompletion方法中被調用。只是默認done方法是空的,completionQueue實現了該方法。

/**
     * Removes and signals all waiting threads, invokes done(), and
     * nulls out callable.
     */
    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (WAITERS.weakCompareAndSet(this, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done();

        callable = null;        // to reduce footprint
    }

參考:

  • https://www.jianshu.com/p/c4a31f914cc7

本文由『後端精進之路』原創,首發於博客 http://teckee.github.io/ , 轉載請註明出處

搜索『後端精進之路』關注公衆號,馬上獲取最新文章和價值2000元的BATJ精品面試課程

後端精進之路.png

相關文章
相關標籤/搜索