Java併發之線程池的使用淺

背景

   當系統併發的線程數量不少,而且每一個線程都是執行一個時間很短的任務就結束了,這樣頻繁建立線程就會大大下降系統的效率,由於頻繁建立線程和銷燬線程須要消耗大量的系統資源。java

  因此須要一個辦法使得線程能夠複用,即當線程執行完一個任務,並不被銷燬,而是能夠繼續執行其餘的任務。在java中就能夠經過線程池來實現這樣的效果。本文講述了java中的線程池類以及如何使用線程池數組

1、java中的線程池ThreadPoolExecutor

   ThreadPoolExecutor是線程池中基礎類也是最爲核心的類。想要了解和合理使用線程池繞不開ThreadPoolExecutor類。下面介紹一下此類。
該類的構造函數以下
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
  //...
}

線程池有這麼幾個重要的參數緩存

  1. corePoolSize: 線程池裏的核心線程數量
  2. maximumPoolSize: 線程池裏容許有的最大線程數量
  3. keepAliveTime:表示線程沒有任務執行時最多保持多久時間會終止。 默認狀況下,若是當前線程數量 > corePoolSize,多出來的線程會在keepAliveTime以後就被釋放掉,直到線程池中的線程數不大於corePoolSize。可是若是調用了allowCoreThreadTimeOut(boolean)方法,在線程池中的線程數不大於corePoolSize時,keepAliveTime參數也會起做用,直到線程池中的線程數爲0
  4. unit: keepAliveTime的時間單位,有7種單位
    • TimeUnit.DAYS;                           //
      TimeUnit.HOURS;                        //小時
      TimeUnit.MINUTES;                    //分鐘
      TimeUnit.SECONDS;                  //
      TimeUnit.MILLISECONDS;         //毫秒
      TimeUnit.MICROSECONDS;       //微妙
      TimeUnit.NANOSECONDS;        //
  5. workQueue: 隊列workQueue的類型爲BlockingQueue<Runnable>,一般能夠取下面三種類型:
    1. 有界任務隊列ArrayBlockingQueue:基於數組的先進先出隊列,此隊列建立時必須指定大小; 
    2. 無界任務隊列LinkedBlockingQueue:基於鏈表的先進先出隊列,若是建立時沒有指定此隊列大小,則默認爲Integer.MAX_VALUE;
    3. 直接提交隊列synchronousQueue:這個隊列比較特殊,它不會保存提交的任務,而是將直接新建一個線程來執行新來的任務。若是進程數量已經達到最大值,則執行拒絕策略。所以使用該隊列須要設置很大的maximumPoolSize,不然很容易執行拒絕策略。
  6. threadFactory: 每當須要建立新的線程放入線程池的時候,就是經過這個線程工廠來建立的
  7. handler: 就是說當線程,隊列都滿了,以後採起的策略,好比拋出異常等策略
    • ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。
    • ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,不作任何處理
    • ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,而後從新嘗試執行任務
    • ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務

2、 線程池中重要的方法 

線程池有兩個重要的操做,提交任務和關閉線程池。在講述這兩個操做以前先了解一下線程池的狀態。注意!!!,線程池的狀態而不是線程狀態。併發

在ThreadPoolExecutor中定義了一個volatile變量,另外定義了幾個static final變量表示線程池的各個狀態:app

//當前線程池的狀態,voliate 保證了線程之間的可見
volatile int runState;

//建立線程池後,初始時,線程池處於此狀態
static final int RUNNING    = 0;

/*調用了shutdown()方法,則線程池處於SHUTDOWN狀態,此時線程池不可以接受新的任務,它會等待全部任務執行完畢*/
static final int SHUTDOWN   = 1;

/*調用了shutdownNow()方法,則線程池處於STOP狀態,此時線程池不能接受新的任務,而且會去嘗試終止正在執行的任務*/
static final int STOP       = 2;

/*線程池處於SHUTDOWN或STOP狀態,而且全部工做線程已經銷燬,任務緩存隊列已經清空或執行結束後,線程池被設置爲TERMINATED狀態*/
static final int TERMINATED = 3;

2.1 線程池提交任務

ThreadPoolExecutor的提交操做可使用submit和execute這兩種方法async

2.1.1 excute

 最核心的任務提交方法是execute()方法,雖然經過submit也能夠提交任務,可是實際上submit方法裏面最終調用的仍是execute()方法 而且ExecutorService中的invokeAll(),invokeAny()都是調用的execute方法。execute提交的任務無返回值,所以沒法判斷任務是否執行成功。可是若是出現線程錯誤能夠顯示部分異常堆棧信息ide

public void execute(Runnable command) {

if (command == null)

throw new NullPointerException();
 // 1.當前線程數量小於corePoolSize,則建立並啓動線程。
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
        // 成功,則返回
                return;
            c = ctl.get();
        }
    // 2.步驟1建立線程失敗,則嘗試把任務加入阻塞隊列,
        if (isRunning(c) && workQueue.offer(command)) {
       // 入隊列成功,檢查線程池狀態,若是狀態部署RUNNING並且remove成功,則拒絕任務
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
       // 若是當前worker數量爲0,經過addWorker(null, false)建立一個線程,其任務爲null
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
    // 3. 步驟1和2失敗,則嘗試將線程池的數量由corePoolSize擴充至maxPoolSize,若是失敗,則拒絕任務
        else if (!addWorker(command, false))
            reject(command);
    }
 }

詳細流程解讀函數

  1.   workerCountOf方法根據ctl的低29位,獲得線程池的當前線程數,若是線程數小於corePoolSize,則執行addWorker方法建立新的線程執行任務;不然執行步驟(2); 
  2.  若是線程池處於RUNNING狀態,且把提交的任務成功放入阻塞隊列中,則執行步驟(3),不然執行步驟(4)
  3.  再次檢查線程池的狀態,若是線程池沒有RUNNING,且成功從阻塞隊列中刪除任務,則執行reject方法處理任務
  4. 執行addWorker方法嘗試將線程池的數量由corePoolSize擴充至maxPoolSize,若是addWoker執行失敗,則執行reject方法處理任務;

2.1.2 sumbit

      若是使用submit 方法來提交任務,它會返回一個future,那麼咱們能夠經過這個future來判斷任務是否執行成功,經過future的get方法來獲取返回值,get方法會阻塞住直到任務完成,而使用get(long timeout, TimeUnit unit)方法則會阻塞一段時間後當即返回,這時有可能任務沒有執行完。能夠經過如下方式改造submit得到部分異常堆棧信息性能

try {
 Future re = pools.submit(Task);
 re.get();

} catch (InterruptedException e) {

// 處理中斷異常

} catch (ExecutionException e) {

// 處理沒法執行任務異常

} finally {

// 關閉線程池

executor.shutdown();

}

2.2 線程池的關閉

使用線程池時,咱們能夠經過調用線程池的shutdown或shutdownNow方法來關閉線程池,可是它們的實現原理不一樣。this

    • shutdown的原理是隻是將線程池的狀態設置成SHUTDOWN狀態,而後中斷全部沒有正在執行任務的線程。
    • shutdownNow的原理是遍歷線程池中的工做線程,而後逐個調用線程的interrupt方法來中斷線程,因此沒法響應中斷的任務可能永遠沒法終止。shutdownNow會首先將線程池的狀態設置成STOP,而後嘗試中止全部的正在執行或暫停任務的線程,並返回等待執行任務的列表。

只要調用了這兩個關閉方法的其中一個,isShutdown方法就會返回true。當全部的任務都已關閉後,才表示線程池關閉成功,這時調用isTerminaed方法會返回true。至於咱們應該調用哪種方法來關閉線程池,應該由提交到線程池的任務特性決定,一般調用shutdown來關閉線程池,若是任務不必定要執行完,則能夠調用shutdownNow。

3、線程池種類

常見的五種線程池

  • newFixedThreadPool:固定數量線程池,這個比較經常使用.能夠指定線程池的大小,該線程池corePoolSize和maximumPoolSize相等,阻塞隊列使用的是LinkedBlockingQueue,大小爲整數最大值
  • newSingleThreadExecutor: 單線程線程池,通常不多使用.
  • newCachedThreadPool:緩存線程池,緩存的線程默認存活60秒,線程的核心池corePoolSize大小爲0,核心池最大爲Integer.MAX_VALUE,阻塞隊列使用的是SynchronousQueue。
  • newScheduledThreadPool:定時線程池,該線程池可用於週期性地去執行任務,一般用於週期性的同步數據。
  • newWorkStealingPool:工做竊取線程池,該線程爲jdk1.8版新增。

 1. newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());

這裏思考一個問題爲何newFixedThreadPool的corePoolSizemamximumPoolSize設計爲同樣的?

       答案能夠從execute的源碼中找到,首先線程池提交任務時是先判斷corePoolSize,再判斷workQueue,最後判斷mamximumPoolSize,然而LinkedBlockingQueue是無界隊列,因此它是達不到判斷mamximumPoolSize這一步的,因此mamximumPoolSize成多少,並無多大所謂。

下邊簡單的介紹一下newFixedThreadPool的使用

public class ThreadPoolDemo {

    public static class MyTask implements Runnable {
        @Override
        public void run() {
            System.out.println(System.currentTimeMillis() + ":ThreadID:" + Thread.currentThread().getId());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {

        MyTask task = new MyTask();

        ExecutorService es = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 10; ++i) {
            es.submit(task);
        }
    }
}

      這裏建立了固定大小爲5的線程,而後依次向線程池提交了10個任務。此後線程池就會安排調度這10個任務。每一個任務都會將本身的執行時間和執行任務線程的Id打印出來,而且每個任務執行時間爲1秒

執行代碼,輸出以下

       能夠看出,前5個任務和後5個任務執行時間相差1秒,而且前五個和後五個ID是一致的。這說明任務是分兩個批次執行。這也符合一個只有5個線程的線程池行爲

 2. newCachedThreadPool

       該方法返回一個可根據實際狀況調整線程數量大小的線程池,線程池的數量不肯定,但如有空閒線程能夠複用,則會優先使用可複用的線程,若是全部線程均在工做,又有新的任務被提交,

則會建立新的線程執行任務。全部線程在當前任務執行完畢後,將返回線程池進行復用。

public static ExecutorService newCachedThreadPool() {
       return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());

        這裏能夠看到CachedThreadExecutormamximumPoolSize被設計成接近無限大。     

        緣由就是和synchronousQueue相關,上邊已經簡單介紹過該隊列了,該隊列的每一個 put 必須等待一個 take,反之亦然。同步隊列沒有任何內部容量,甚至連一個隊列的容量都沒有。若是 mamximumPoolSize不設計得很大,那麼就很容易拋出異常。
        因此,使用該線程池時,必定要注意控制併發的任務數,不然建立大量的線程可能致使嚴重的性能問題

3. newScheduledThreadPool

    計劃任務,和其餘線程池不一樣,該線程池並不必定會當即安排任務,主要是起計劃任務的做用。他會在指定時間、對任務進行調度。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) {
    return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue(), threadFactory); }
  • scheduleAtFixedRate:是以固定的頻率去執行任務,週期是指每次執行任務成功執行之間的間隔。
  • schedultWithFixedDelay:是以固定的延時去執行任務,延時是指上一次執行成功以後和下一次開始執行的以前的時間

4. newSingleThreadExecutor

     單個線程線程池,只有一個線程的線程池,阻塞隊列使用的是LinkedBlockingQueue,如有多餘的任務提交到線程池中,則會被暫存到阻塞隊列,待空閒時再去執行。按照先入先出的順序執行任務。

public static ExecutorService newSingleThreadExecutor() {
       return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()));
}

public static ExecutorService newSingleThreadExecutor(ThreadFactory var0) {
       return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), var0));
}

5. newWorkStealingPool

會根據所需的並行層次來動態建立一個擁有足夠的線程數目的線程池。經過使用多個隊列來下降競爭。並行的層次是和運行的最大線程數目相關。運行過程當中實際的線程數目或許會動態地增加和收縮。

 其本質是一個一個工做竊取的線程池,因此對於提交的任務不能保證是順序執行的。底層用的ForkJoinPool來實現的。ForkJoinPool的優點在於,能夠充分利用多cpu,多核cpu的優點,

把一個任務拆分紅多個「小任務」,把多個「小任務」放到多個處理器核心上並行執行;當多個「小任務」執行完成以後,再將這些執行結果合併起來便可。分治的思想。下面是newWorkStealingPool的構造函數

public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool (parallelism,ForkJoinPool.defaultForkJoinWorkerThreadFactory,null, true);
 }

//使用一個無限隊列來保存須要執行的任務,能夠傳入線程的數量,不傳入,則默認使用當前計算機中可用的cpu數量,使用分治法來解決問題,使用fork()和join()來進行調用
public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler,boolean asyncMode) {
        this(checkParallelism(parallelism), checkFactory(factory), handler, asyncMode ? FIFO_QUEUE : LIFO_QUEUE,"ForkJoinPool-" + nextPoolId() + "-worker-");
        checkPermission();
    }

  假設有3個線程A、B、C在運行,workStealing能夠簡單這麼認爲,每一個線程都維護本身的一個隊列,線程A的隊列裏頭積累了3個任務,線程B的隊列裏2個任務,C的隊列裏1個任務;那麼當線程C執行完任務以後,它會去別的線程池所維護的隊列裏面把任務偷過來繼續執行,主動的找活幹。

咱們看一下newWorkStealingPool的使用案例

/**
 * WorkStealingPool(任務竊取,都是守護線程)
 * 每一個線程都有要處理的隊列中的任務,若是其中的線程完成本身隊列中的任務,
 * 那麼它能夠去其餘線程中獲取其餘線程的任務去執行
 */
public class TestWorkStealingPool {

    public static void main(String[] args) throws IOException {
        // 根據cpu是幾核來開啓幾個線程
        ExecutorService service = Executors.newWorkStealingPool();
        // 查看當前計算機是幾核
        System.out.println(Runtime.getRuntime().availableProcessors());
        service.execute(new R(1000));
        service.execute(new R(3000));
        service.execute(new R(4000));
        service.execute(new R(2000));
        service.execute(new R(3000));
        service.execute(new R(3000));
        service.execute(new R(3000));
        service.execute(new R(3000));

        // WorkStealing是精靈線程(守護線程、後臺線程),主線程不阻塞,看不到輸出。
        // 虛擬機不中止,守護線程不中止
        System.in.read();
    }

    static class R implements Runnable {
        int time;

        public R(int time) {
            this.time = time;
        }

        @Override
        public void run() {
            System.out.println(time + ":" + Thread.currentThread().getName() + "執行時間爲:" + System.currentTimeMillis());
            try {
                TimeUnit.MILLISECONDS.sleep(time);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

結果輸出 

   能夠看到newWorkStealingPool執行時根據cpu核心分配線程數量,這裏打印顯示cpu核心數爲4,明顯的能夠看出線程將前4 個任務扔給了1-4號線程,後四個任務在排隊等待。當1-4號線程中某個線程最早執行完畢後

會自動竊取未執行的任務,這裏能夠看到1號線程最早執行完畢用時1000ms,而後就竊取第五個任務。第0號線程執行完畢後,又緊接着執行第六個任務....。依次類推,直到全部任務執行完畢。

4、線程池的使用建議

  建議根據本身的須要手動建立線程池(new ThreadPoolExecutor(......)),這樣能夠靈活使用線程池,而且可能夠加深本身對線程池的理解。阿里巴巴開發手冊和Dubbo線程池的開發手冊也是這樣建議

Dubbo線程池的開發手冊以下

阿里巴巴開發手冊建議以下

咱們能夠看一下Dubbo怎麼建立線程池的

@SPI("fixed")
public interface ThreadPool {

    /**
     * 線程池
     * 
     * @param url 線程參數
     * @return 線程池
     */
    @Adaptive({Constants.THREADPOOL_KEY})
    Executor getExecutor(URL url);

}
public class FixedThreadPool implements ThreadPool {

    public Executor getExecutor(URL url) {
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS, 
                queues == 0 ? new SynchronousQueue<Runnable>() : 
                    (queues < 0 ? new LinkedBlockingQueue<Runnable>() 
                            : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }

}

    默認狀況下,Dubbo的FixedThreadPool中,maximumPoolSize = 200,隊列是容量很小的SynchronousQueue.因此當線程超過200的時候,線程池就會拋出異常.

總結

     建議根據本身的須要手動建立線程池,也能夠根據本身的須要實現ThreadFactory接口,從而切合實際使用中的項目

相關文章
相關標籤/搜索