java併發線程池---瞭解ThreadPoolExecutor就夠了

總結:線程池的特色是,在線程的數量=corePoolSize後,僅任務隊列滿了以後,纔會從任務隊列中取出一個任務,而後構造一個新的線程,循環往復直到線程數量達到maximumPoolSize執行拒絕策略。java

線程池-intsmaze

線程池的思想是:在系統中開闢一塊區域,其中存放一些待命的線程,這個區域被稱爲線程池。若是有須要執行的任務,則從線程池中借一個待命的線程來執行指定的任務,到任務結束能夠再將所借線程歸還。這樣就避免了大量重複建立線程對象,浪費CPU,內存資源。

自定義線程池-intsmaze

若是觀察jdk提供的各類線程池的源碼實現能夠發現,除了jdk8新增的線程池newWorkStealingPool之外,都是基於對ThreadPoolExecutor的封裝實現,因此首先講解ThreadPoolExecutor的具體功能。算法

ThreadPoolExecutor詳解-intsmaze

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, 
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

corePoolSize:指定線程池中線程數量編程

maximumPoolSize:最大線程數量併發

keepAliveTime:線程數量超過corePoolSize時,多於的空閒線程的存活時間(超過這段時間,該空閒線程會被銷燬)。性能

unit:keepAliveTime的時間單位優化

workQueue:任務隊列,提交可是未被執行的任務ui

threadFactory:建立線程的線程工廠,默認便可this

handler:拒絕策略,當任務太多來不及處理,如何拒絕任務,默認爲new AbortPolicy()策略。
        ExecutorService es = new ThreadPoolExecutor(3, 8, 60L,
                TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
                Executors.defaultThreadFactory(),
                new RejectedExecutionHandler() {
                    public void rejectedExecution(Runnable r,
                            ThreadPoolExecutor executor) {
                        System.out.println("discard");
                    }
                });

任務隊列--存放runnable對象-intsmaze

總結:線程池的特色是,在線程的數量=corePoolSize後,僅任務隊列滿了以後,纔會從任務隊列中取出一個任務,而後構造一個新的線程,循環往復直到線程數量達到maximumPoolSize執行拒絕策略。
 
只要隊列實現BlockingQueue接口便可,注意ConcurrentLinkedQueue實現的最頂層的queue接口因此不能用在這裏
經常使用的有以下:
SynchronousQueue:直接提交隊列,該隊列沒有容量,每個插入操做都要等待一個相應的刪除操做,反之,每個刪除操做都要等待對應的插入操做。因此他不保存任務,老是將任務提交給線程執行,若是沒有空閒的線程,則建立新的線程,當線程數量達到最大,則執行拒絕策略。
 
ArrayBlockingQueue:有界任務隊列,線程池的線程數小於corePoolSize,則建立新的線程,大於corePoolSize,則將新的任務加入等待隊列。若等待隊列已滿,則在總線程不大於maximumPoolSize下,建立新的線程執行任務,大於maximumPoolSize則執行拒絕策略。
 
LinkedBlockingQueue:無界隊列,除非系統資源耗盡,不然不存在任務入隊失敗的狀況。線程池的線程數小於corePoolSize,則建立新的線程,大於corePoolSize,則將新的任務加入等待隊列。
 
PriorityBlockingQueue優先任務隊列,能夠控制任務的執行前後順序,是無界隊列。ArrayBlockingQueue,LinkedBlockingQueue都是按照先進先出算法處理任務的,PriorityBlockingQueue能夠根據任務自身的優先順序前後執行。

拒絕策略-intsmaze

線程池中的線程用完了,同時等待隊列中的任務已經塞滿了,再也塞不下新任務了,就須要拒絕策略:處理任務數量超過系統實際承受能力時,處理方式。
jdk內置四種拒絕策略:
AbortPolicy:直接拋出異常(默認策略),就算線程池有空閒了,後面的線程也沒法在運行了,要想後面的線程能夠運行,要捕獲異常信息。

CallerRunsPolicy:該策略直接在調用者線程中運行當前被丟棄的任務。顯然這樣作不會真的丟棄任務,可是任務提交線程的性能極有可能會急劇降低。
atom

DiscardOldestPolicy:將丟棄最老的一個請求,也就是即將被執行的一個任務,並嘗試再次提交當前任務。
spa

DiscardPolicy:默默丟棄沒法處理的任務,不予任何處理。若是容許任務丟失,這多是最好的一種解決方案。在線程池不空閒的時候,提交的任務都將丟棄,當有空閒的線程時提交的任務會執行。

下面是jdk的拒絕策略源碼-intsmaze

   public static class CallerRunsPolicy implements RejectedExecutionHandler {

        public CallerRunsPolicy() { }

        /**
         * 直接在調用者線程中運行當前被丟棄的任務,要注意這裏是調用Runnable的run()方法,而不是start()方法啓動線程,run()以普通方法的形式在主線程中執行任務,會阻塞
* 後面es.submit(new MyTask(i))方法的執行
*/ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } } public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } } public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } /** * Does nothing, which has the effect of discarding task r. */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } } public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } /** * 將丟棄最老的一個請求,也就是即將被執行的一個任務,並嘗試再次提交當前任務。 */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }

總結:AbortPolicy策略下,咱們要catch異常,這樣咱們能夠捕獲到哪些任務被丟棄了。若是採用其餘的策略,丟棄的任務沒法定位的,只能經過下列程序中es.submit(new MyTask(i));任務以前打印該任務,運行任務的run()邏輯是,在打印任務信息,兩處日誌比對來定位哪些任務被丟棄了。

public class MyTask implements Runnable
{

    private int number;
    
    public MyTask(int number) {
        super();
        this.number = number;
    }

    public void run() {
        System.out.println(System.currentTimeMillis()+"thread id:"+Thread.currentThread().getId()+"==="+number);
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

    public static void main(String[] args)  {
//        ExecutorService es=new ThreadPoolExecutor(5,5,60L, TimeUnit.SECONDS, 
//                new ArrayBlockingQueue<Runnable>(1), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
        
//        ExecutorService es=new ThreadPoolExecutor(5,5,60L, TimeUnit.SECONDS,
//                new ArrayBlockingQueue<Runnable>(5), Executors.defaultThreadFactory(),new ThreadPoolExecutor.CallerRunsPolicy());
        
//        ExecutorService es=new ThreadPoolExecutor(5,5,60L, TimeUnit.SECONDS,
//                new ArrayBlockingQueue<Runnable>(5), Executors.defaultThreadFactory(),new ThreadPoolExecutor.DiscardPolicy());
        
        ExecutorService es=new ThreadPoolExecutor(5,5,60L, TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(5), Executors.defaultThreadFactory(),new ThreadPoolExecutor.DiscardOldestPolicy());
        for(int i=0;i<10000;i++)
        {
            try {
                System.out.println(i);
                es.submit(new MyTask(i));
                Thread.sleep(100);
            } catch (Exception e) {
                e.printStackTrace();
                System.out.println("------------------------"+i);
            }
        }
    }

線程池執行邏輯源碼解析-intsmaze

      public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
    
       /**
     * Executes the given task sometime in the future.  The task
     * may execute in a new thread or in an existing pooled thread.
     *
     * If the task cannot be submitted for execution, either because this
     * executor has been shutdown or because its capacity has been reached,
     * the task is handled by the current {@code RejectedExecutionHandler}.
     *
     * @param command the task to execute
     * @throws RejectedExecutionException at discretion of
     *         {@code RejectedExecutionHandler}, if the task
     *         cannot be accepted for execution
     * @throws NullPointerException if {@code command} is null
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *若是少於corePoolSize線程正在運行,首先嚐試用給定的命令啓動一個新的線程任務。 
         自動調用addWorker檢查runState和workerCount,
         
         
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *若是任務能夠成功排隊,那麼咱們仍然須要
           仔細檢查咱們是否應該添加一個線程
          (由於現有的自從上次檢查後死亡)或者那個
          自進入該方法以來,該池關閉。 因此咱們
          從新檢查狀態,若是有必要的話回滾隊列
          中止,或者若是沒有的話就開始一個新的線程。
         
         
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);//隊列滿了,執行拒絕策略
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }
    
    final void reject(Runnable command) {
        handler.rejectedExecution(command, this);//這裏就是調用咱們傳入的拒絕策略對象的方法
    }
    
     /**
     * Dispatch an uncaught exception to the handler. This method is
     * intended to be called only by the JVM.
     */
    private void dispatchUncaughtException(Throwable e) {
        getUncaughtExceptionHandler().uncaughtException(this, e);
    }

常見線程池的構造方法-intsmaze

newFixedThreadPoo-intsmaze

任務隊列爲LinkedBlockingQueue中(長度無限),線程數量和最大線程數量相同。功能參考前面的任務隊列總結。

ExecutorService es=Executors.newFixedThreadPool(5);//參數同時指定線程池中線程數量爲5,最大線程數量爲5public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

 

newSingleThreadExecutor-intsmaze

任務隊列LinkedBlockingQueue中(長度無限),線程數量和最大線程數量均爲1。
ExecutorService es=Executors.newSingleThreadExecutor();//線程池中線程數量和最大線程數量均爲1.public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

newCachedThreadPool-intsmaze

任務隊列爲SynchronousQueue,線程數量爲0,最大線程數量爲Integer.MAX_VALUE,因此只要有任務沒有空閒線程就會建立就新線程。
ExecutorService es=Executors.newCachedThreadPool();
//指定線程池中線程數量爲0,最大線程數量爲Integer.MAX_VALUE,任務隊列爲SynchronousQueuepublic static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

 

newScheduledThreadPool- -定時線程-intsmaze

任務隊列爲new DelayedWorkQueue(),返回的對象在ExecutorService接口上擴展了在指定時間執行某認爲的功能,在某個固定的延時以後執行或週期性執行某個任務。 

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
}


public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
}

newSingleThreadScheduledExecutor- -定時線程-intsmaze

至關於newScheduledThreadPool(int corePoolSize)corePoolSize設置爲1。

ScheduledExecutorService es=Executors.newSingleThreadScheduledExecutor();

延遲線程池

class MyScheduledTask implements Runnable
{
 private String tname;
 public MyScheduledTask(String tname)
 {
  this.tname=tname;
 }
 public void run()
 {
  System.out.println(tname+"任務時延2秒執行!!!");
 }
}
public class intsmaze
{
 public static void main(String[] args)
 {
  ScheduledExecutorService scheduledThreadPool
                       =Executors.newScheduledThreadPool(2);
  MyScheduledTask mt1=new MyScheduledTask("MT1");
  scheduledThreadPool.schedule(mt1,2,TimeUnit.SECONDS);

 }
}

 

newWorkStealingPool java8新增鏈接池-intsmaze

    public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool
            (parallelism,
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }//建立指定數量的線程池來執行給定的並行級別,還會使用多個隊列減小競爭
    public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }//前一個方法的簡化,若是當前機器有4個CPU,則目標的並行級別被設置爲4。

 

關閉線程池(不多使用,除了切換數據源時須要控制)-intsmaze

但願程序執行完全部任務後退出,調用ExecutorService接口中的shutdown(),shutdownNow()方法。
用完一個線程池後,應該調用該線程池的shutdown方法,將啓動線程池的關閉序列。調用shutdown方法後,線程池不在接收新的任務,可是會將之前全部已經提交的任務執行完。當線程池中的全部任務都執行完後,線程池中的全部線程都會死亡;shutdownNow方法會試圖中止全部正在執行的活動任務,暫停處理正在等待的任務,並返回等待執行的任務列表。

線程池優化-intsmaze

通常來講肯定線程池的大小須要考慮CPU數量,內存大小,JDBC鏈接等因素。在《java併發編程實踐》一書中給出了一個估算線程池大小的經驗公式:
Ncpu=CPU的數量
Ucpu=目標CPU的使用率,0<=Ucpu<=1
W/C=等待時間與計算時間的比率
爲保持處理器達到指望的使用率,最優的線程池的大小等於:
Nthreads=Ncpu*Ucpu*(1+W/C)
在java中,能夠經過
Runtime.getRuntime().availableProcessors()

取得能夠CPU數量。

相關文章
相關標籤/搜索