Java併發——線程池Executor框架

線程池

無限制的建立線程

若採用"爲每一個任務分配一個線程"的方式會存在一些缺陷,尤爲是當須要建立大量線程時:java

  • 線程生命週期的開銷很是高
  • 資源消耗
  • 穩定性

引入線程池

任務是一組邏輯工做單元,線程則是使任務異步執行的機制。當存在大量併發任務時,建立、銷燬線程須要很大的開銷,運用線程池能夠大大減少開銷。緩存

 

 

Executor框架

說明:併發

  • Executor 執行器接口,該接口定義執行Runnable任務的方式。
  • ExecutorService 該接口定義提供對Executor的服務。
  • ScheduledExecutorService 定時調度接口。
  • AbstractExecutorService 執行框架抽象類。
  • ThreadPoolExecutor JDK中線程池的具體實現。
  • Executors 線程池工廠類。

 

 

ThreadPoolExecutor 線程池類

線程池是一個複雜的任務調度工具,它涉及到任務、線程池等的生命週期問題。要配置一個線程池是比較複雜的,尤爲是對於線程池的原理不是很清楚的狀況下,頗有可能配置的線程池不是較優的。框架

JDK中的線程池均由ThreadPoolExecutor類實現。其構造方法以下:less

 

 

參數說明:異步

corePoolSize:核心線程數。ide

maximumPoolSize:最大線程數。高併發

keepAliveTime:線程存活時間。當線程數大於core數,那麼超過該時間的線程將會被終結。工具

unit:keepAliveTime的單位。java.util.concurrent.TimeUnit類存在靜態靜態屬性: NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDSthis

workQueue:Runnable的阻塞隊列。若線程池已經被佔滿,則該隊列用於存放沒法再放入線程池中的Runnable

 

 

另外一個構造方法:

 

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)

 

該方法在下面的擴展部分有更深刻的講解。其中handler表示線程池對拒絕任務的處理策略。

 

ThreadPoolExecutor的使用須要注意如下概念:

  • 若線程池中的線程數量小於corePoolSize,即便線程池中的線程都處於空閒狀態,也要建立新的線程來處理被添加的任務。
  • 若線程池中的線程數量等於 corePoolSize且緩衝隊列 workQueue未滿,則任務被放入緩衝隊列。
  • 若線程池中線程的數量大於corePoolSize且緩衝隊列workQueue滿,且線程池中的數量小於maximumPoolSize,則建新的線程來處理被添加的任務。
  • 若線程池中線程的數量大於corePoolSize且緩衝隊列workQueue滿,且線程池中的數量等於maximumPoolSize,那麼經過 handler所指定的策略來處理此任務。
  • 當線程池中的線程數量大於corePoolSize時,若是某線程空閒時間超過keepAliveTime,線程將被終止。

 

 

Executors 工廠方法

JDK內部提供了五種最多見的線程池。由Executors類的五個靜態工廠方法建立。

  • newFixedThreadPool(...)
  • newSingleThreadExecutor(...)
  • newCachedThreadPool(...)
  • newScheduledThreadPool(...)
  • newSingleThreadScheduledExecutor()

 

單線程的線程池 newSingleThreadExecutor

這個線程池只有一個線程在工做,也就是至關於單線程串行執行全部任務。

返回單線程的Executor,將多個任務交給此Exector時,這個線程處理完一個任務後接着處理下一個任務,若該線程出現異常,將會有一個新的線程來替代。此線程池保證全部任務的執行順序按照任務的提交順序執行。

說明:LinkedBlockingQueue會無限的添加須要執行的Runnable

 

建立固定大小的線程池 newFixedThreadPool

每次提交一個任務就建立一個線程,直到線程達到線程池的最大大小。線程池的大小一旦達到最大值就會保持不變,若是某個線程由於執行異常而結束,那麼線程池會補充一個新線程。

public static ExecutorSevice newFixedThreadPool()

返回一個包含指定數目線程的線程池,若是任務數量多於線程數目,那麼沒有沒有執行的任務必須等待,直到有任務完成爲止。

 

可緩存的線程池 newCachedThreadPool

若是線程池的大小超過了處理任務所須要的線程,那麼就會回收部分空閒(60秒不執行任務)的線程,當任務數增長時,此線程池又能夠智能的添加新線程來處理任務。此線程池不會對線程池大小作限制,線程池大小徹底依賴於操做系統(或者說JVM)可以建立的最大線程大小。

newCachedThreadPool方法建立的線程池能夠自動的擴展線程池的容量。核心線程數量爲0。

SynchronousQueue是個特殊的隊列。 SynchronousQueue隊列的容量爲0。當試圖爲SynchronousQueue添加Runnable,則執行會失敗。只有當一邊從SynchronousQueue取數據,一邊向SynchronousQueue添加數據才能夠成功。SynchronousQueue僅僅起到數據交換的做用,並不保存線程。但newCachedThreadPool()方法沒有線程上限。Runable添加到SynchronousQueue會被馬上取出。

根據用戶的任務數建立相應的線程來處理,該線程池不會對線程數目加以限制,徹底依賴於JVM能建立線程的數量,可能引發內存不足。

 

定時任務調度的線程池 newScheduledThreadPool

建立一個大小無限的線程池。此線程池支持定時以及週期性執行任務的需求。

例:

public class ScheduledThreadPoolTest {
    public static void main(String[] args) {
        ScheduledExecutorService ses = Executors.newScheduledThreadPool(10);
        ses.scheduleWithFixedDelay(new Runnable() {
            
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(new Date());
            }
        }, 1000, 2000, TimeUnit.MILLISECONDS);
    }
    
}

 

 

單線程的定時任務調度線程池 newSingleThreadScheduledExecutor

此線程池支持定時以及週期性執行任務的需求。

 

 

Executor接口

Executor是一個線程執行接口。任務執行的主要抽象不是Thead,而是Executor。

public interface Executor{
    void executor(Runnable command);
}

 

Executor將任務的提交過程與執行過程解耦,並用Runnable來表示任務。執行的任務放入run方法中便可,將Runnable接口的實現類交給線程池的execute方法,做爲它的一個參數。若是須要給任務傳遞參數,能夠經過建立一個Runnable接口的實現類來完成。

Executor能夠支持多種不一樣類型的任務執行策略。

Executor基於生產者消費者模式,提交任務的操做至關於生產者,執行任務的線程則至關於消費者。

 

ExecutorService接口

線程池接口。ExecutorService在Executor的基礎上增長了一些方法,其中有兩個核心的方法:

Future<?> submit(Runnable task)

<T> Future<T> submit(Callable<T> task)

 

這兩個方法都是向線程池中提交任務,它們的區別在於Runnable在執行完畢後沒有結果,Callable執行完畢後有一個結果。這在多個線程中傳遞狀態和結果是很是有用的。另外他們的相同點在於都返回一個Future對象。Future對象能夠阻塞線程直到運行完畢(獲取結果,若是有的話),也能夠取消任務執行,固然也可以檢測任務是否被取消或者是否執行完畢。

在沒有Future以前咱們檢測一個線程是否執行完畢一般使用Thread.join()或者用一個死循環加狀態位來描述線程執行完畢。如今有了更好的方法可以阻塞線程,檢測任務執行完畢甚至取消執行中或者未開始執行的任務。

 

ScheduledExecutorService接口

ScheduledExecutorService描述的功能和Timer/TimerTask相似,解決那些須要任務重複執行的問題。這包括延遲時間一次性執行、延遲時間週期性執行以及固定延遲時間週期性執行等。固然了繼承ExecutorService的ScheduledExecutorService擁有ExecutorService的所有特性。

 

 

 

線程池生命週期

線程是有多種執行狀態的,一樣管理線程的線程池也有多種狀態。JVM會在全部線程(非後臺daemon線程)所有終止後才退出,爲了節省資源和有效釋放資源關閉一個線程池就顯得很重要。有時候沒法正確的關閉線程池,將會阻止JVM的結束。

線程池Executor是異步的執行任務,所以任什麼時候刻不可以直接獲取提交的任務的狀態。這些任務有可能已經完成,也有可能正在執行或者還在排隊等待執行。所以關閉線程池可能出現一下幾種狀況:

  • 平緩關閉:已經啓動的任務所有執行完畢,同時再也不接受新的任務。
  • 當即關閉:取消全部正在執行和未執行的任務。

另外關閉線程池後對於任務的狀態應該有相應的反饋信息。

 

啓動線程池

線程池在構造前(new操做)是初始狀態,一旦構造完成線程池就進入了執行狀態RUNNING。嚴格意義上講線程池構造完成後並無線程被當即啓動,只有進行"預啓動"或者接收到任務的時候纔會啓動線程。

線程池是處於運行狀態,隨時準備接受任務來執行。

 

關閉線程池

線程池運行中能夠經過shutdown()和shutdownNow()來改變運行狀態。

  • shutdown():平緩的關閉線程池。線程池中止接受新的任務,同時等待已經提交的任務執行完畢,包括那些進入隊列尚未開始的任務。shutdown()方法執行過程當中,線程池處於SHUTDOWN狀態。
  • shutdownNow():當即關閉線程池。線程池中止接受新的任務,同時線程池取消全部執行的任務和已經進入隊列可是尚未執行的任務。shutdownNow()方法執行過程當中,線程池處於STOP狀態。shutdownNow方法本質是調用Thread.interrupt()方法。但咱們知道該方法僅僅是讓線程處於interrupted狀態,並不會讓線程真正的中止!因此若只調用或只調用一次shutdownNow()方法,不必定會讓線程池中的線程都關閉掉,線程中必需要有處理interrupt事件的機制。

 

線程池結束

一旦shutdown()或者shutdownNow()執行完畢,線程池就進入TERMINATED狀態,即線程池就結束了。

  • isTerminating() 若是關閉後全部任務都已完成,則返回 true。
  • isShutdown() 若是此執行程序已關閉,則返回 true。

 

例:使用固定大小的線程池。並將任務添加到線程池。

import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;

public class JavaThreadPool {
    public static void main(String[] args) {
        // 建立一個可重用固定線程數的線程池
        ExecutorService pool = Executors.newFixedThreadPool(2);
        // 建立實現了Runnable接口對象,Thread對象固然也實現了Runnable接口
        Thread t1 = new MyThread();
        Thread t2 = new MyThread();
        Thread t3 = new MyThread();
        Thread t4 = new MyThread();
        Thread t5 = new MyThread();

        // 將線程放入池中進行執行
        pool.execute(t1);
        pool.execute(t2);
        pool.execute(t3);
        pool.execute(t4);
        pool.execute(t5);

        // 關閉線程池
        pool.shutdown();
    }

}


class MyThread extends Thread {

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + "正在執行。。。");
    }

}

 

 

Java線程池擴展

ThreadPoolExecutor線程池的執行監控

ThreadPoolExecutor中定義了三個空方法,用於監控線程的執行狀況。

ThreadPoolExecutor源碼:

protected void beforeExecute(Thread t, Runnable r) { }

protected void afterExecute(Runnable r, Throwable t) { }

protected void terminated() { }

 

例:使用覆蓋方法,定義新的線程池。

public class ExtThreadPoolTest {
    
    static class MyTask implements Runnable {
        public String name;
        
        public MyTask(String name) {
            super();
            this.name = name;
        }
        
        @Override
        public void run() {
            try {
                Thread.sleep(500);
                System.out.println("執行中:"+this.name);
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    public static void main(String[] args) throws InterruptedException {
        
        ExecutorService es = new ThreadPoolExecutor(5,5,0,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()){

            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                System.out.println("準備執行:" + ((MyTask)r).name);
            }

            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                System.out.println("執行完成:" + ((MyTask)r).name);
            }

            @Override
            protected void terminated() {
                System.out.println("執行退出");
            }
            
        };
        
        
        for(int i=0;i<5;i++){
            MyTask task = new MyTask("Task-"+i);
            es.execute(task);
        }
        
        Thread.sleep(10);    // 等待terminated()執行
        
        es.shutdown();    // 若無該方法,主線程不會結束。
    }
    
}

 

 

 

ThreadPoolExecutor的拒絕策略

線程池不可能處理無限多的線程。因此一旦線程池中中須要執行的任務過多,線程池對於某些任務就沒法處理了。拒絕策略即對這些沒法處理的任務進行處理。可能丟棄掉這些不能處理的任務,也可能用其餘方式。

ThreadPoolExecutor類還有另外一個構造方法。該構造方法中的RejectedExecutionHandler 用於定義拒絕策略。 

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
    
    .....
    
    }

 

 

JDK內部已經提供一些拒絕策略。

 

 

AbortPolicy 一旦線程不能處理,則拋出異常。

AbortPolicy源碼:

public static class AbortPolicy implements RejectedExecutionHandler {

        /**
         * Creates an {@code AbortPolicy}.
         */
        public AbortPolicy() { }

        /**
         * Always throws RejectedExecutionException.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always.
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

 

 

DiscardPolicy 一旦線程不能處理,則丟棄任務。

DiscardPolicy源碼:

    public static class DiscardPolicy implements RejectedExecutionHandler {

        /**
         * Creates a {@code DiscardPolicy}.
         */
        public DiscardPolicy() { }


        /**
         * Does nothing, which has the effect of discarding task r.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }

    }

 

 

CallerRunsPolicy 一旦線程不能處理,則將任務返回給提交任務的線程處理。

CallerRunsPolicy源碼:

    public static class CallerRunsPolicy implements RejectedExecutionHandler {

        /**
         * Creates a {@code CallerRunsPolicy}.
         */
        public CallerRunsPolicy() { }

 
        /**
         * Executes task r in the caller's thread, unless the executor
         * has been shut down, in which case the task is discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }

 

 

DiscardOldestPolicy 一旦線程不能處理,丟棄掉隊列中最老的任務。

DiscardOldestPolicy源碼:

    public static class DiscardOldestPolicy implements RejectedExecutionHandler {

        /**
         * Creates a {@code DiscardOldestPolicy} for the given executor.
         */
        public DiscardOldestPolicy() { }

        /**
         * Obtains and ignores the next task that the executor
         * would otherwise execute, if one is immediately available,
         * and then retries execution of task r, unless the executor
         * is shut down, in which case task r is instead discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }

 

 

例:自定義拒絕策略。打印並丟棄沒法處理的任務。

public class RejectedPolicyHandleTest {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = new ThreadPoolExecutor(5,5,0,TimeUnit.MILLISECONDS,new SynchronousQueue<Runnable>(),Executors.defaultThreadFactory(),new RejectedExecutionHandler() {
            
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                // 打印並丟棄。
                System.out.println(r.toString()+" is discard");
            }
        });
        
        for(int i=0;i<Integer.MAX_VALUE;i++){
            MyTask task = new MyTask("Task-"+i);
            es.execute(task);
            Thread.sleep(10);
        }
        
        es.shutdown();    // 若無該方法,主線程不會結束。
        
    }
}

 

 

ThreadFactory 線程工廠

ThreadPoolExecutor類構造器的參數其中之一即爲ThreadFactory線程工廠。

ThreadFactory用於建立線程池中的線程。

public interface ThreadFactory {
    Thread newThread(Runnable r);
}

 

ThreadFactory的實現類中通常定義線程了線程組,線程數與線程名稱。

DefaultThreadFactory源碼:

static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

 

 

CompletionService接口

這裏須要稍微提一下的是CompletionService接口,它是用於描述順序獲取執行結果的一個線程池包裝器。它依賴一個具體的線程池調度,可是可以根據任務的執行前後順序獲得執行結果,這在某些狀況下可能提升併發效率。

相關文章
相關標籤/搜索