java線程池ThreadPoolExecutor類使用詳解

在《阿里巴巴java開發手冊》中指出了線程資源必須經過線程池提供,不容許在應用中自行顯示的建立線程,這樣一方面是線程的建立更加規範,能夠合理控制開闢線程的數量;另外一方面線程的細節管理交給線程池處理,優化了資源的開銷。而線程池不容許使用Executors去建立,而要經過ThreadPoolExecutor方式,這一方面是因爲jdk中Executor框架雖然提供瞭如newFixedThreadPool()、newSingleThreadExecutor()、newCachedThreadPool()等建立線程池的方法,但都有其侷限性,不夠靈活;另外因爲前面幾種方法內部也是經過ThreadPoolExecutor方式實現,使用ThreadPoolExecutor有助於你們明確線程池的運行規則,建立符合本身的業務場景須要的線程池,避免資源耗盡的風險。java

下面咱們就對ThreadPoolExecutor的使用方法進行一個詳細的概述。緩存

首先看下ThreadPoolExecutor的構造函數安全

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

構造函數的參數含義以下:微信

corePoolSize:指定了線程池中的線程數量,它的數量決定了添加的任務是開闢新的線程去執行,仍是放到workQueue任務隊列中去;併發

maximumPoolSize:指定了線程池中的最大線程數量,這個參數會根據你使用的workQueue任務隊列的類型,決定線程池會開闢的最大線程數量;框架

keepAliveTime:當線程池中空閒線程數量超過corePoolSize時,多餘的線程會在多長時間內被銷燬;函數

unit:keepAliveTime的單位優化

workQueue:任務隊列,被添加到線程池中,但還沒有被執行的任務;它通常分爲直接提交隊列、有界任務隊列、無界任務隊列、優先任務隊列幾種;this

threadFactory:線程工廠,用於建立線程,通常用默認便可;spa

handler:拒絕策略;當任務太多來不及處理時,如何拒絕任務;

接下來咱們對其中比較重要參數作進一步的瞭解:

1、workQueue任務隊列

上面咱們已經介紹過了,它通常分爲直接提交隊列、有界任務隊列、無界任務隊列、優先任務隊列;

一、直接提交隊列:設置爲SynchronousQueue隊列,SynchronousQueue是一個特殊的BlockingQueue,它沒有容量,沒執行一個插入操做就會阻塞,須要再執行一個刪除操做纔會被喚醒,反之每個刪除操做也都要等待對應的插入操做。

public class ThreadPool {
    private static ExecutorService pool;
    public static void main( String[] args )
    {
        //maximumPoolSize設置爲2 ,拒絕策略爲AbortPolic策略,直接拋出異常
        pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
        for(int i=0;i<3;i++) {
            readPool.execute(new ThreadTask());
        }   
    }
}

public class ThreadTask implements Runnable{
    
    public ThreadTask() {
        
    }
    
    public void run() {
        System.out.println(Thread.currentThread().getName());
    }
}

輸出結果爲

pool-1-thread-1
pool-1-thread-2
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.hhxx.test.ThreadTask@55f96302 rejected from java.util.concurrent.ThreadPoolExecutor@3d4eac69[Running, pool size = 2, active threads = 0, queued tasks = 0, completed tasks = 2]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.reject(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.execute(Unknown Source)
    at com.hhxx.test.ThreadPool.main(ThreadPool.java:17)

能夠看到,當任務隊列爲SynchronousQueue,建立的線程數大於maximumPoolSize時,直接執行了拒絕策略拋出異常。

使用SynchronousQueue隊列,提交的任務不會被保存,老是會立刻提交執行。若是用於執行任務的線程數量小於maximumPoolSize,則嘗試建立新的進程,若是達到maximumPoolSize設置的最大值,則根據你設置的handler執行拒絕策略。所以這種方式你提交的任務不會被緩存起來,而是會被立刻執行,在這種狀況下,你須要對你程序的併發量有個準確的評估,才能設置合適的maximumPoolSize數量,不然很容易就會執行拒絕策略;

二、有界的任務隊列:有界的任務隊列可使用ArrayBlockingQueue實現,以下所示

pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());

使用ArrayBlockingQueue有界任務隊列,如有新的任務須要執行時,線程池會建立新的線程,直到建立的線程數量達到corePoolSize時,則會將新的任務加入到等待隊列中。若等待隊列已滿,即超過ArrayBlockingQueue初始化的容量,則繼續建立線程,直到線程數量達到maximumPoolSize設置的最大線程數量,若大於maximumPoolSize,則執行拒絕策略。在這種狀況下,線程數量的上限與有界任務隊列的狀態有直接關係,若是有界隊列初始容量較大或者沒有達到超負荷的狀態,線程數將一直維持在corePoolSize如下,反之當任務隊列已滿時,則會以maximumPoolSize爲最大線程數上限。

三、無界的任務隊列:有界任務隊列可使用LinkedBlockingQueue實現,以下所示

pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());

使用無界任務隊列,線程池的任務隊列能夠無限制的添加新的任務,而線程池建立的最大線程數量就是你corePoolSize設置的數量,也就是說在這種狀況下maximumPoolSize這個參數是無效的,哪怕你的任務隊列中緩存了不少未執行的任務,當線程池的線程數達到corePoolSize後,就不會再增長了;若後續有新的任務加入,則直接進入隊列等待,當使用這種任務隊列模式時,必定要注意你任務提交與處理之間的協調與控制,否則會出現隊列中的任務因爲沒法及時處理致使一直增加,直到最後資源耗盡的問題。

4、優先任務隊列:優先任務隊列經過PriorityBlockingQueue實現,下面咱們經過一個例子演示下

public class ThreadPool {
    private static ExecutorService pool;
    public static void main( String[] args )
    {
        //優先任務隊列
        pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new PriorityBlockingQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
          
        for(int i=0;i<20;i++) {
            pool.execute(new ThreadTask(i));
        }    
    }
}

public class ThreadTask implements Runnable,Comparable<ThreadTask>{
    
    private int priority;
    
    public int getPriority() {
        return priority;
    }

    public void setPriority(int priority) {
        this.priority = priority;
    }

    public ThreadTask() {
        
    }
    
    public ThreadTask(int priority) {
        this.priority = priority;
    }

    //當前對象和其餘對象作比較,當前優先級大就返回-1,優先級小就返回1,值越小優先級越高
    public int compareTo(ThreadTask o) {
         return  this.priority>o.priority?-1:1;
    }
    
    public void run() {
        try {
            //讓線程阻塞,使後續任務進入緩存隊列
            Thread.sleep(1000);
            System.out.println("priority:"+this.priority+",ThreadName:"+Thread.currentThread().getName());
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    
    }
}

咱們來看下執行的結果狀況

priority:0,ThreadName:pool-1-thread-1
priority:9,ThreadName:pool-1-thread-1
priority:8,ThreadName:pool-1-thread-1
priority:7,ThreadName:pool-1-thread-1
priority:6,ThreadName:pool-1-thread-1
priority:5,ThreadName:pool-1-thread-1
priority:4,ThreadName:pool-1-thread-1
priority:3,ThreadName:pool-1-thread-1
priority:2,ThreadName:pool-1-thread-1
priority:1,ThreadName:pool-1-thread-1

你們能夠看到除了第一個任務直接建立線程執行外,其餘的任務都被放入了優先任務隊列,按優先級進行了從新排列執行,且線程池的線程數一直爲corePoolSize,也就是隻有一個。

經過運行的代碼咱們能夠看出PriorityBlockingQueue它實際上是一個特殊的無界隊列,它其中不管添加了多少個任務,線程池建立的線程數也不會超過corePoolSize的數量,只不過其餘隊列通常是按照先進先出的規則處理任務,而PriorityBlockingQueue隊列能夠自定義規則根據任務的優先級順序前後執行。

2、拒絕策略

通常咱們建立線程池時,爲防止資源被耗盡,任務隊列都會選擇建立有界任務隊列,但種模式下若是出現任務隊列已滿且線程池建立的線程數達到你設置的最大線程數時,這時就須要你指定ThreadPoolExecutor的RejectedExecutionHandler參數即合理的拒絕策略,來處理線程池"超載"的狀況。ThreadPoolExecutor自帶的拒絕策略以下:

一、AbortPolicy策略:該策略會直接拋出異常,阻止系統正常工做;

二、CallerRunsPolicy策略:若是線程池的線程數量達到上限,該策略會把任務隊列中的任務放在調用者線程當中運行;

三、DiscardOledestPolicy策略:該策略會丟棄任務隊列中最老的一個任務,也就是當前任務隊列中最早被添加進去的,立刻要被執行的那個任務,並嘗試再次提交;

四、DiscardPolicy策略:該策略會默默丟棄沒法處理的任務,不予任何處理。固然使用此策略,業務場景中需容許任務的丟失;

以上內置的策略均實現了RejectedExecutionHandler接口,固然你也能夠本身擴展RejectedExecutionHandler接口,定義本身的拒絕策略,咱們看下示例代碼:

public class ThreadPool {
    private static ExecutorService pool;
    public static void main( String[] args )
    {
        //自定義拒絕策略
        pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5),
                Executors.defaultThreadFactory(), new RejectedExecutionHandler() {
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                System.out.println(r.toString()+"執行了拒絕策略");
                
            }
        });
          
        for(int i=0;i<10;i++) {
            pool.execute(new ThreadTask());
        }    
    }
}

public class ThreadTask implements Runnable{    
    public void run() {
        try {
            //讓線程阻塞,使後續任務進入緩存隊列
            Thread.sleep(1000);
            System.out.println("ThreadName:"+Thread.currentThread().getName());
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    
    }
}

輸出結果:

com.hhxx.test.ThreadTask@33909752執行了拒絕策略
com.hhxx.test.ThreadTask@55f96302執行了拒絕策略
com.hhxx.test.ThreadTask@3d4eac69執行了拒絕策略
ThreadName:pool-1-thread-2
ThreadName:pool-1-thread-1
ThreadName:pool-1-thread-1
ThreadName:pool-1-thread-2
ThreadName:pool-1-thread-1
ThreadName:pool-1-thread-2
ThreadName:pool-1-thread-1

能夠看到因爲任務加了休眠阻塞,執行須要花費必定時間,致使會有必定的任務被丟棄,從而執行自定義的拒絕策略;

3、ThreadFactory自定義線程建立

 線程池中線程就是經過ThreadPoolExecutor中的ThreadFactory,線程工廠建立的。那麼經過自定義ThreadFactory,能夠按須要對線程池中建立的線程進行一些特殊的設置,如命名、優先級等,下面代碼咱們經過ThreadFactory對線程池中建立的線程進行記錄與命名

public class ThreadPool {
    private static ExecutorService pool;
    public static void main( String[] args )
    {
        //自定義線程工廠
        pool = new ThreadPoolExecutor(2, 4, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5),
                new ThreadFactory() {
            public Thread newThread(Runnable r) {
                System.out.println("線程"+r.hashCode()+"建立");
                //線程命名
                Thread th = new Thread(r,"threadPool"+r.hashCode());
                return th;
            }
        }, new ThreadPoolExecutor.CallerRunsPolicy());
          
        for(int i=0;i<10;i++) {
            pool.execute(new ThreadTask());
        }    
    }
}

public class ThreadTask implements Runnable{    
    public void run() {
        //輸出執行線程的名稱
        System.out.println("ThreadName:"+Thread.currentThread().getName());
    }
}

咱們看下輸出結果

線程118352462建立
線程1550089733建立
線程865113938建立
ThreadName:threadPool1550089733
ThreadName:threadPool118352462
線程1442407170建立
ThreadName:threadPool1550089733
ThreadName:threadPool1550089733
ThreadName:threadPool1550089733
ThreadName:threadPool865113938
ThreadName:threadPool865113938
ThreadName:threadPool118352462
ThreadName:threadPool1550089733
ThreadName:threadPool1442407170

能夠看到線程池中,每一個線程的建立咱們都進行了記錄輸出與命名。

4、ThreadPoolExecutor擴展

ThreadPoolExecutor擴展主要是圍繞beforeExecute()、afterExecute()和terminated()三個接口實現的,

一、beforeExecute:線程池中任務運行前執行

二、afterExecute:線程池中任務運行完畢後執行

三、terminated:線程池退出後執行

經過這三個接口咱們能夠監控每一個任務的開始和結束時間,或者其餘一些功能。下面咱們能夠經過代碼實現一下

public class ThreadPool {
    private static ExecutorService pool;
    public static void main( String[] args ) throws InterruptedException
    {
        //實現自定義接口
        pool = new ThreadPoolExecutor(2, 4, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5),
                new ThreadFactory() {
            public Thread newThread(Runnable r) {
                System.out.println("線程"+r.hashCode()+"建立");
                //線程命名
                Thread th = new Thread(r,"threadPool"+r.hashCode());
                return th;
            }
        }, new ThreadPoolExecutor.CallerRunsPolicy()) {
    
            protected void beforeExecute(Thread t,Runnable r) {
                System.out.println("準備執行:"+ ((ThreadTask)r).getTaskName());
            }
            
            protected void afterExecute(Runnable r,Throwable t) {
                System.out.println("執行完畢:"+((ThreadTask)r).getTaskName());
            }
            
            protected void terminated() {
                System.out.println("線程池退出");
            }
        };
          
        for(int i=0;i<10;i++) {
            pool.execute(new ThreadTask("Task"+i));
        }    
        pool.shutdown();
    }
}

public class ThreadTask implements Runnable{    
    private String taskName;
    public String getTaskName() {
        return taskName;
    }
    public void setTaskName(String taskName) {
        this.taskName = taskName;
    }
    public ThreadTask(String name) {
        this.setTaskName(name);
    }
    public void run() {
        //輸出執行線程的名稱
        System.out.println("TaskName"+this.getTaskName()+"---ThreadName:"+Thread.currentThread().getName());
    }
}

我看下輸出結果

線程118352462建立
線程1550089733建立
準備執行:Task0
準備執行:Task1
TaskNameTask0---ThreadName:threadPool118352462
線程865113938建立
執行完畢:Task0
TaskNameTask1---ThreadName:threadPool1550089733
執行完畢:Task1
準備執行:Task3
TaskNameTask3---ThreadName:threadPool1550089733
執行完畢:Task3
準備執行:Task2
準備執行:Task4
TaskNameTask4---ThreadName:threadPool1550089733
執行完畢:Task4
準備執行:Task5
TaskNameTask5---ThreadName:threadPool1550089733
執行完畢:Task5
準備執行:Task6
TaskNameTask6---ThreadName:threadPool1550089733
執行完畢:Task6
準備執行:Task8
TaskNameTask8---ThreadName:threadPool1550089733
執行完畢:Task8
準備執行:Task9
TaskNameTask9---ThreadName:threadPool1550089733
準備執行:Task7
執行完畢:Task9
TaskNameTask2---ThreadName:threadPool118352462
TaskNameTask7---ThreadName:threadPool865113938
執行完畢:Task7
執行完畢:Task2
線程池退出

能夠看到經過對beforeExecute()、afterExecute()和terminated()的實現,咱們對線程池中線程的運行狀態進行了監控,在其執行先後輸出了相關打印信息。另外使用shutdown方法能夠比較安全的關閉線程池, 當線程池調用該方法後,線程池中再也不接受後續添加的任務。可是,此時線程池不會馬上退出,直到添加到線程池中的任務都已經處理完成,纔會退出。

5、線程池線程數量

線程吃線程數量的設置沒有一個明確的指標,根據實際狀況,只要不是設置的偏大和偏小都問題不大,結合下面這個公式便可

            /**
             * Nthreads=CPU數量
             * Ucpu=目標CPU的使用率,0<=Ucpu<=1
             * W/C=任務等待時間與任務計算時間的比率
             */
            Nthreads = Ncpu*Ucpu*(1+W/C)

以上就是對ThreadPoolExecutor類從構造函數、拒絕策略、自定義線程建立等方面介紹了其詳細的使用方法,從而咱們能夠根據本身的須要,靈活配置和使用線程池建立線程,其中若有不足與不正確的地方還望指出與海涵。

 

關注微信公衆號,查看更多技術文章。

相關文章
相關標籤/搜索