「Java併發編程」線程池相關知識點整理

爲何要用線程池?

池化技術:減小每次獲取資源的消耗,提升對資源的利用率。

線程池提供了一種限制和管理資源(包括執行一個任務)。 每一個線程池還維護一些基本統計信息,例如已完成任務的數量。編程

使用線程池的好處:數組

  • 下降資源消耗。經過重複利用已建立的線程下降線程建立和銷燬形成的消耗。
  • 提升響應速度。當任務到達時,任務能夠不須要的等到線程建立就能當即執行。
  • 提升線程的可管理性。線程是稀缺資源,若是無限制的建立,不只會消耗系統資源,還會下降系統的穩定性,使用線程池能夠進行統一的分配,調優和監控。

線程池的實現原理?

execute方法源碼

public void execute(Runnable command) {
        // 若是任務爲null,則拋出異常。        if (command == null)
            throw new NullPointerException();        // ctl 中保存的線程池當前的一些狀態信息  AtomicInteger        int c = ctl.get();        //判斷當前線程池中執行的任務數量是否小於corePoolSize        if (workerCountOf(c) < corePoolSize) {
            //若是小於,則經過addWorker新建一個線程,而後,啓動該線程從而執行任務。            if (addWorker(command, true))
                return;
            c = ctl.get();        }        //經過 isRunning 方法判斷線程池狀態        //線程池處於 RUNNING 狀態纔會被而且隊列能夠加入任務        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();            // 再次獲取線程池狀態,若是線程池狀態不是 RUNNING 狀態就須要從任務隊列中移除任務。            // 並嘗試判斷線程是否所有執行完畢。同時執行拒絕策略。            if (! isRunning(recheck) && remove(command))
                reject(command);
            // 若是當前線程池爲空就新建立一個線程並執行。            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }        //經過addWorker新建一個線程,並將任務(command)添加到該線程中;
        //而後,啓動該線程從而執行任務。        //若是addWorker執行失敗,則經過reject()執行相應的拒絕策略的內容。        else if (!addWorker(command, false))
            reject(command);
    }
線程池建立線程的時候,會將線程封裝成工做線程Worker,Worker在執行完成任務以後,還會循環獲取工做隊列利的任務來執行。
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();        Runnable task = w.firstTask;        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {//循環執行任務
                w.lock();                //若是線程池正在中止,確保線程被中斷
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&                      runStateAtLeast(ctl.get(), STOP))) &&                    !wt.isInterrupted())                    wt.interrupt();                try {
                    beforeExecute(wt, task);                    Throwable thrown = null;
                    try {
                        task.run();                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);                    }                } finally {
                    task = null;
                    w.completedTasks++;                    w.unlock();                }            }            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);        }    }

「Java併發編程」線程池相關知識點整理

  1. 線程池判斷核心線程池【corePoolSize】裏的線程是否都在執行任務。若是不是,則建立一個新的工做線程來執行任務。若是核心線程池裏的線程都在執行任務,則進入下個流程。
  2. 線程池判斷工做隊列【BlockingQueue】是否已經滿。若是工做隊列沒有滿,則將新提交的任務存儲在這個工做隊列裏。若是工做隊列滿了,則進入下個流程。
  3. 線程池判斷線程池【maximumPoolSize】的線程是否都處於工做狀態。若是沒有,則建立一個新的工做線程來執行任務。若是已經滿了,則交給飽和策略【RejectedExecutionHandler.rejectedExecution()】來處理這個任務。

「Java併發編程」線程池相關知識點整理

如何使用線程池?併發

ThreadPoolExecutor重要分析

構造方法的重要參數

ThreadPoolExecutor方法的構造參數有不少,咱們看看最長的那個就能夠了:框架

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,                              long keepAliveTime,                              TimeUnit unit,                              BlockingQueue<Runnable> workQueue,                              ThreadFactory threadFactory,                              RejectedExecutionHandler handler) {        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
  • corePoolSize:核心線程數定義了最小能夠同時運行的線程數量
  • maximumPoolSize:當隊列中存放的任務達到隊列容量的時候,當前能夠同時運行的線程數量變爲最大線程數。【若是使用的無界隊列,這個參數就沒啥效果】
  • workQueue: 當新任務來的時候會先判斷當前運行的線程數量是否達到核心線程數,若是達到核心線程數的話,新任務就會被存放在隊列中
  • keepAliveTime:當線程池中的線程數量大於 corePoolSize 的時候,若是這時沒有新的任務提交,核心線程外的線程不會當即銷燬,而是會等待,直到等待的時間超過了 keepAliveTime纔會被回收銷燬。
  • unit:keepAliveTime 的時間單位。
  • threadFactory:用於設置建立線程的工廠,能夠經過線程工廠給每一個建立出來的線程設置更有意義的名字。
  • handler:飽和策略,當前同時運行的線程數量達到最大線程數量【maximumPoolSize】而且隊列也已經被放滿時,執行飽和策略。

線程池的簡單使用

public class ThreadPoolTest {
    private static final int CORE_POOL_SIZE = 5; //核心線程數
    private static final int MAX_POOL_SIZE = 10; //最大線程數
    private static final int QUEUE_CAPACITY = 100; //任務隊列的容量
    private static final Long KEEP_ALIVE_TIME = 1L; //等待時間
    public static void main(String[] args) {        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(                CORE_POOL_SIZE,                MAX_POOL_SIZE,                KEEP_ALIVE_TIME,                TimeUnit.SECONDS,                new ArrayBlockingQueue<>(QUEUE_CAPACITY),                new ThreadPoolExecutor.AbortPolicy());        for(int i = 0; i < 10 ; i ++){
            Runnable worker = new MyRunnable(""+ i); //建立任務
            threadPool.execute(worker); //經過execute提交
        }        threadPool.shutdown();        while(!threadPool.isTerminated()){
        }        System.out.println("Finished all threads");
    }}class MyRunnable implements Runnable {    private String command;    MyRunnable(String s) {
        this.command = s;
    }    @Override    public void run() {        System.out.println(Thread.currentThread().getName() + " Start. Time = " + new Date());
        processCommand();        System.out.println(Thread.currentThread().getName() + " End. Time = " + new Date());
    }    private void processCommand() {        try {            Thread.sleep(5000);
        } catch (InterruptedException e) {            e.printStackTrace();        }    }    @Override    public String toString() {        return this.command;
    }}

任務隊列有哪些?

  • ArrayBlockingQueue:基於數組結構的有界阻塞隊列,按FIFO原則對元素進行排序。
  • LinkedBlockingQueue:基於鏈表結構的阻塞隊列,按FIFO排序元素,吞吐量一般要高於ArrayBlockingQueue,Executors.newFixedThreadPool()就是使用了這個隊列。
  • SynchronousQueue:一個不存儲元素的阻塞隊列,每一個插入操做必須等到另外一個線程調用移除操做,不然插入操做一直處於阻塞狀態,吞吐量一般要高於LinkedBlockingQueue,Executors.newCachedThreadPool()就是使用了這個隊列。
  • PriorityBlockingQueue:一個具備優先級的無限阻塞隊列。

飽和策略有哪些呢?

  • ThreadPoolExecutor.AbortPolicy:拋出 RejectedExecutionException來拒絕新任務的處理。【默認的飽和策略】
  • ThreadPoolExecutor.CallerRunsPolicy【提供可伸縮隊列】:調用執行本身的線程運行任務,也就是直接在調用execute方法的線程中運行(run)被拒絕的任務,若是執行程序已關閉,則會丟棄該任務。所以這種策略會下降對於新任務提交速度,影響程序的總體性能。若是您的應用程序能夠承受此延遲而且你要求任何一個任務請求都要被執行的話,你能夠選擇這個策略。public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } }
  • ThreadPoolExecutor.DiscardPolicy: 不處理新任務,直接丟棄掉。
  • ThreadPoolExecutor.DiscardOldestPolicy: 此策略將丟棄最先的未處理的任務請求,並執行當前任務。public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } }

固然,也能夠根據須要自定義拒絕策略,須要實現RejectedExecutionHandler。ide

如何建立線程池?

1、使用ThreadPoolExecutor的各類構造方法。工具

2、經過Executor框架的工具類Executors能夠建立三種類型的ThreadPoolExecutor。性能

《阿里巴巴 Java 開發手冊》中強制線程池不容許使用 Executors 去建立,而是經過 ThreadPoolExecutor 的方式,這樣的處理方式讓寫的同窗更加明確線程池的運行規則,規避資源耗盡的風險this

Executors 返回線程池對象的弊端以下: FixedThreadPool 和 SingleThreadExecutor: 容許請求的隊列長度爲 Integer.MAX_VALUE ,可能堆積大量的請求,從而致使 OOM。 CachedThreadPool 和 ScheduledThreadPool : 容許建立的線程數量爲 Integer.MAX_VALUE ,可能會建立大量線程,從而致使 OOM。

執行execute方法和submit方法的區別?

  • execute()方法用於提交不須要返回值的任務,因此沒法判斷任務是否被線程池執行成功與否;
threadPool.execute(new Runnable() {
    @Override
    public void run() {
    }}); //經過execute提交
  • submit()方法用於提交須要返回值的任務。線程池會返回一個 Future 類型的對象,經過這個 Future 對象能夠判斷任務是否執行成功,而且能夠經過 Future 的 get()方法來獲取返回值,get()方法會阻塞當前線程直到任務完成,而使用 get(long timeout,TimeUnit unit)方法則會阻塞當前線程一段時間後當即返回,這時候有可能任務沒有執行完。
Future<Object> future = threadPool.submit(hasReturnValueTask);
try{
    Object s = future.get();
}catch(InterruptedException e){
    //處理中斷異常
}catch(ExecutionException e){
 //處理沒法執行任務異常
}finally{
 threadPool.shutdown();
}

若是以爲本文對你有幫助,能夠點贊關注支持一下spa

相關文章
相關標籤/搜索