沉澱再出發:java中線程池解析

沉澱再出發:java中線程池解析

1、前言

   在多線程執行的環境之中,若是線程執行的時間短可是啓動的線程又很是多,線程運轉的時間基本上浪費在了建立和銷燬上面,所以有沒有一種方式可以讓一個線程執行完本身的任務以後又被重複使用呢?線程池的出現就是爲了解決這個問題。到了如今,咱們知道的池已經有不少了,好比IP池,在NAT協議中使用,好比緩存機制,其實本質上就是重複利用已經產生的資源,從而減小對新資源的使用,以此來緩解對內存和CPU的壓力,或者加快執行的效率。html

2、線程池的基本理解

  2.一、線程池的概念

    多線程的異步執行方式,雖然可以最大限度發揮多核計算機的計算能力,可是若是不加控制,反而會對系統形成負擔。線程自己也要佔用內存空間,大量的線程會佔用內存資源而且可能會致使Out of Memory。即使沒有這樣的狀況,大量的線程回收也會給GC帶來很大的壓力。爲了不重複的建立線程,線程池的出現可讓線程進行復用。通俗點講,當有工做來,就會向線程池拿一個線程,當工做完成後,並非直接關閉線程,而是將這個線程歸還給線程池供其餘任務使用。java

 

    Executor是一個頂層接口,在它裏面只聲明瞭一個方法execute(Runnable),返回值爲void,參數爲Runnable類型,從字面意思能夠理解,就是用來執行傳進去的任務的;
  而後ExecutorService接口繼承了Executor接口,並聲明瞭一些方法:submit、invokeAll、invokeAny以及shutDown等;
  抽象類AbstractExecutorService實現了ExecutorService接口,基本實現了ExecutorService中聲明的全部方法;
  而後ThreadPoolExecutor繼承了類AbstractExecutorService。
  在ThreadPoolExecutor類中有幾個很是重要的方法:ios

1 execute()
2 submit()
3 shutdown()
4 shutdownNow()

   execute()方法其實是Executor中聲明的方法,在ThreadPoolExecutor進行了具體的實現,這個方法是ThreadPoolExecutor的核心方法,經過這個方法能夠向線程池提交一個任務,交由線程池去執行。
  submit()方法是在ExecutorService中聲明的方法,在AbstractExecutorService就已經有了具體的實現,在ThreadPoolExecutor中並無對其進行重寫,這個方法也是用來向線程池提交任務的,可是它和execute()方法不一樣,它可以返回任務執行的結果,去看submit()方法的實現,會發現它實際上仍是調用的execute()方法,只不過它利用了Future來獲取任務執行結果。
  shutdown()和shutdownNow()是用來關閉線程池的。
  還有不少其餘的方法,好比:getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等獲取與線程池相關屬性的方法。web

  2.二、線程池的源碼分析

   java.uitl.concurrent.ThreadPoolExecutor類是線程池中最核心的一個類,所以若是要透徹地瞭解Java中的線程池,必須先了解這個類。編程

   讓咱們看一個例子:數組

 1 package com.threadpool.test;
 2 
 3 import java.util.concurrent.ArrayBlockingQueue;
 4 import java.util.concurrent.ThreadPoolExecutor;
 5 import java.util.concurrent.TimeUnit;
 6 
 7 public class ThreadPoolTest {
 8     public static void main(String[] args) {   
 9         ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
10                 new ArrayBlockingQueue<Runnable>(5));
11          
12         for(int i=0;i<15;i++){
13             MyTask myTask = new MyTask(i);
14             executor.execute(myTask);
15             System.out.println("線程池中線程數目:"+executor.getPoolSize()+",隊列中等待執行的任務數目:"+
16             executor.getQueue().size()+",已執行玩別的任務數目:"+executor.getCompletedTaskCount());
17         }
18         executor.shutdown();
19     }
20 }
21 
22 
23 class MyTask implements Runnable {
24    private int taskNum;
25     
26    public MyTask(int num) {
27        this.taskNum = num;
28    }
29     
30    public void run() {
31        System.out.println("正在執行task "+taskNum);
32        try {
33            Thread.currentThread().sleep(4000);
34        } catch (InterruptedException e) {
35            e.printStackTrace();
36        }
37        System.out.println("task "+taskNum+"執行完畢");
38    }
39 }

  運行結果:緩存

 1 線程池中線程數目:1,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0
 2 線程池中線程數目:2,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0
 3 線程池中線程數目:3,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0
 4 線程池中線程數目:4,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0
 5 線程池中線程數目:5,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0
 6 正在執行task 4
 7 正在執行task 3
 8 正在執行task 2
 9 正在執行task 1
10 線程池中線程數目:5,隊列中等待執行的任務數目:1,已執行玩別的任務數目:0
11 線程池中線程數目:5,隊列中等待執行的任務數目:2,已執行玩別的任務數目:0
12 線程池中線程數目:5,隊列中等待執行的任務數目:3,已執行玩別的任務數目:0
13 線程池中線程數目:5,隊列中等待執行的任務數目:4,已執行玩別的任務數目:0
14 線程池中線程數目:5,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0
15 線程池中線程數目:6,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0
16 線程池中線程數目:7,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0
17 線程池中線程數目:8,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0
18 線程池中線程數目:9,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0
19 線程池中線程數目:10,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0
20 正在執行task 0
21 正在執行task 10
22 正在執行task 11
23 正在執行task 12
24 正在執行task 13
25 正在執行task 14
26 task 2執行完畢
27 task 3執行完畢
28 正在執行task 5
29 正在執行task 6
30 task 4執行完畢
31 正在執行task 7
32 task 1執行完畢
33 正在執行task 8
34 task 0執行完畢
35 正在執行task 9
36 task 11執行完畢
37 task 10執行完畢
38 task 14執行完畢
39 task 13執行完畢
40 task 12執行完畢
41 task 6執行完畢
42 task 5執行完畢
43 task 8執行完畢
44 task 7執行完畢
45 task 9執行完畢
View Code

  來看一下ThreadPoolExecutor:多線程

   1 /*
   2  * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
   3  *
   4  *
   5  *
   6  *
   7  *
   8  *
   9  *
  10  *
  11  *
  12  *
  13  *
  14  *
  15  *
  16  *
  17  *
  18  *
  19  *
  20  *
  21  *
  22  *
  23  */
  24 
  25 /*
  26  *
  27  *
  28  *
  29  *
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/publicdomain/zero/1.0/
  34  */
  35 
  36 package java.util.concurrent;
  37 import java.util.concurrent.locks.AbstractQueuedSynchronizer;
  38 import java.util.concurrent.locks.Condition;
  39 import java.util.concurrent.locks.ReentrantLock;
  40 import java.util.concurrent.atomic.AtomicInteger;
  41 import java.util.*;
  42 
  43 /**
  44  * An {@link ExecutorService} that executes each submitted task using
  45  * one of possibly several pooled threads, normally configured
  46  * using {@link Executors} factory methods.
  47  *
  48  * <p>Thread pools address two different problems: they usually
  49  * provide improved performance when executing large numbers of
  50  * asynchronous tasks, due to reduced per-task invocation overhead,
  51  * and they provide a means of bounding and managing the resources,
  52  * including threads, consumed when executing a collection of tasks.
  53  * Each {@code ThreadPoolExecutor} also maintains some basic
  54  * statistics, such as the number of completed tasks.
  55  *
  56  * <p>To be useful across a wide range of contexts, this class
  57  * provides many adjustable parameters and extensibility
  58  * hooks. However, programmers are urged to use the more convenient
  59  * {@link Executors} factory methods {@link
  60  * Executors#newCachedThreadPool} (unbounded thread pool, with
  61  * automatic thread reclamation), {@link Executors#newFixedThreadPool}
  62  * (fixed size thread pool) and {@link
  63  * Executors#newSingleThreadExecutor} (single background thread), that
  64  * preconfigure settings for the most common usage
  65  * scenarios. Otherwise, use the following guide when manually
  66  * configuring and tuning this class:
  67  *
  68  * <dl>
  69  *
  70  * <dt>Core and maximum pool sizes</dt>
  71  *
  72  * <dd>A {@code ThreadPoolExecutor} will automatically adjust the
  73  * pool size (see {@link #getPoolSize})
  74  * according to the bounds set by
  75  * corePoolSize (see {@link #getCorePoolSize}) and
  76  * maximumPoolSize (see {@link #getMaximumPoolSize}).
  77  *
  78  * When a new task is submitted in method {@link #execute(Runnable)},
  79  * and fewer than corePoolSize threads are running, a new thread is
  80  * created to handle the request, even if other worker threads are
  81  * idle.  If there are more than corePoolSize but less than
  82  * maximumPoolSize threads running, a new thread will be created only
  83  * if the queue is full.  By setting corePoolSize and maximumPoolSize
  84  * the same, you create a fixed-size thread pool. By setting
  85  * maximumPoolSize to an essentially unbounded value such as {@code
  86  * Integer.MAX_VALUE}, you allow the pool to accommodate an arbitrary
  87  * number of concurrent tasks. Most typically, core and maximum pool
  88  * sizes are set only upon construction, but they may also be changed
  89  * dynamically using {@link #setCorePoolSize} and {@link
  90  * #setMaximumPoolSize}. </dd>
  91  *
  92  * <dt>On-demand construction</dt>
  93  *
  94  * <dd>By default, even core threads are initially created and
  95  * started only when new tasks arrive, but this can be overridden
  96  * dynamically using method {@link #prestartCoreThread} or {@link
  97  * #prestartAllCoreThreads}.  You probably want to prestart threads if
  98  * you construct the pool with a non-empty queue. </dd>
  99  *
 100  * <dt>Creating new threads</dt>
 101  *
 102  * <dd>New threads are created using a {@link ThreadFactory}.  If not
 103  * otherwise specified, a {@link Executors#defaultThreadFactory} is
 104  * used, that creates threads to all be in the same {@link
 105  * ThreadGroup} and with the same {@code NORM_PRIORITY} priority and
 106  * non-daemon status. By supplying a different ThreadFactory, you can
 107  * alter the thread's name, thread group, priority, daemon status,
 108  * etc. If a {@code ThreadFactory} fails to create a thread when asked
 109  * by returning null from {@code newThread}, the executor will
 110  * continue, but might not be able to execute any tasks. Threads
 111  * should possess the "modifyThread" {@code RuntimePermission}. If
 112  * worker threads or other threads using the pool do not possess this
 113  * permission, service may be degraded: configuration changes may not
 114  * take effect in a timely manner, and a shutdown pool may remain in a
 115  * state in which termination is possible but not completed.</dd>
 116  *
 117  * <dt>Keep-alive times</dt>
 118  *
 119  * <dd>If the pool currently has more than corePoolSize threads,
 120  * excess threads will be terminated if they have been idle for more
 121  * than the keepAliveTime (see {@link #getKeepAliveTime(TimeUnit)}).
 122  * This provides a means of reducing resource consumption when the
 123  * pool is not being actively used. If the pool becomes more active
 124  * later, new threads will be constructed. This parameter can also be
 125  * changed dynamically using method {@link #setKeepAliveTime(long,
 126  * TimeUnit)}.  Using a value of {@code Long.MAX_VALUE} {@link
 127  * TimeUnit#NANOSECONDS} effectively disables idle threads from ever
 128  * terminating prior to shut down. By default, the keep-alive policy
 129  * applies only when there are more than corePoolSize threads. But
 130  * method {@link #allowCoreThreadTimeOut(boolean)} can be used to
 131  * apply this time-out policy to core threads as well, so long as the
 132  * keepAliveTime value is non-zero. </dd>
 133  *
 134  * <dt>Queuing</dt>
 135  *
 136  * <dd>Any {@link BlockingQueue} may be used to transfer and hold
 137  * submitted tasks.  The use of this queue interacts with pool sizing:
 138  *
 139  * <ul>
 140  *
 141  * <li> If fewer than corePoolSize threads are running, the Executor
 142  * always prefers adding a new thread
 143  * rather than queuing.</li>
 144  *
 145  * <li> If corePoolSize or more threads are running, the Executor
 146  * always prefers queuing a request rather than adding a new
 147  * thread.</li>
 148  *
 149  * <li> If a request cannot be queued, a new thread is created unless
 150  * this would exceed maximumPoolSize, in which case, the task will be
 151  * rejected.</li>
 152  *
 153  * </ul>
 154  *
 155  * There are three general strategies for queuing:
 156  * <ol>
 157  *
 158  * <li> <em> Direct handoffs.</em> A good default choice for a work
 159  * queue is a {@link SynchronousQueue} that hands off tasks to threads
 160  * without otherwise holding them. Here, an attempt to queue a task
 161  * will fail if no threads are immediately available to run it, so a
 162  * new thread will be constructed. This policy avoids lockups when
 163  * handling sets of requests that might have internal dependencies.
 164  * Direct handoffs generally require unbounded maximumPoolSizes to
 165  * avoid rejection of new submitted tasks. This in turn admits the
 166  * possibility of unbounded thread growth when commands continue to
 167  * arrive on average faster than they can be processed.  </li>
 168  *
 169  * <li><em> Unbounded queues.</em> Using an unbounded queue (for
 170  * example a {@link LinkedBlockingQueue} without a predefined
 171  * capacity) will cause new tasks to wait in the queue when all
 172  * corePoolSize threads are busy. Thus, no more than corePoolSize
 173  * threads will ever be created. (And the value of the maximumPoolSize
 174  * therefore doesn't have any effect.)  This may be appropriate when
 175  * each task is completely independent of others, so tasks cannot
 176  * affect each others execution; for example, in a web page server.
 177  * While this style of queuing can be useful in smoothing out
 178  * transient bursts of requests, it admits the possibility of
 179  * unbounded work queue growth when commands continue to arrive on
 180  * average faster than they can be processed.  </li>
 181  *
 182  * <li><em>Bounded queues.</em> A bounded queue (for example, an
 183  * {@link ArrayBlockingQueue}) helps prevent resource exhaustion when
 184  * used with finite maximumPoolSizes, but can be more difficult to
 185  * tune and control.  Queue sizes and maximum pool sizes may be traded
 186  * off for each other: Using large queues and small pools minimizes
 187  * CPU usage, OS resources, and context-switching overhead, but can
 188  * lead to artificially low throughput.  If tasks frequently block (for
 189  * example if they are I/O bound), a system may be able to schedule
 190  * time for more threads than you otherwise allow. Use of small queues
 191  * generally requires larger pool sizes, which keeps CPUs busier but
 192  * may encounter unacceptable scheduling overhead, which also
 193  * decreases throughput.  </li>
 194  *
 195  * </ol>
 196  *
 197  * </dd>
 198  *
 199  * <dt>Rejected tasks</dt>
 200  *
 201  * <dd>New tasks submitted in method {@link #execute(Runnable)} will be
 202  * <em>rejected</em> when the Executor has been shut down, and also when
 203  * the Executor uses finite bounds for both maximum threads and work queue
 204  * capacity, and is saturated.  In either case, the {@code execute} method
 205  * invokes the {@link
 206  * RejectedExecutionHandler#rejectedExecution(Runnable, ThreadPoolExecutor)}
 207  * method of its {@link RejectedExecutionHandler}.  Four predefined handler
 208  * policies are provided:
 209  *
 210  * <ol>
 211  *
 212  * <li> In the default {@link ThreadPoolExecutor.AbortPolicy}, the
 213  * handler throws a runtime {@link RejectedExecutionException} upon
 214  * rejection. </li>
 215  *
 216  * <li> In {@link ThreadPoolExecutor.CallerRunsPolicy}, the thread
 217  * that invokes {@code execute} itself runs the task. This provides a
 218  * simple feedback control mechanism that will slow down the rate that
 219  * new tasks are submitted. </li>
 220  *
 221  * <li> In {@link ThreadPoolExecutor.DiscardPolicy}, a task that
 222  * cannot be executed is simply dropped.  </li>
 223  *
 224  * <li>In {@link ThreadPoolExecutor.DiscardOldestPolicy}, if the
 225  * executor is not shut down, the task at the head of the work queue
 226  * is dropped, and then execution is retried (which can fail again,
 227  * causing this to be repeated.) </li>
 228  *
 229  * </ol>
 230  *
 231  * It is possible to define and use other kinds of {@link
 232  * RejectedExecutionHandler} classes. Doing so requires some care
 233  * especially when policies are designed to work only under particular
 234  * capacity or queuing policies. </dd>
 235  *
 236  * <dt>Hook methods</dt>
 237  *
 238  * <dd>This class provides {@code protected} overridable
 239  * {@link #beforeExecute(Thread, Runnable)} and
 240  * {@link #afterExecute(Runnable, Throwable)} methods that are called
 241  * before and after execution of each task.  These can be used to
 242  * manipulate the execution environment; for example, reinitializing
 243  * ThreadLocals, gathering statistics, or adding log entries.
 244  * Additionally, method {@link #terminated} can be overridden to perform
 245  * any special processing that needs to be done once the Executor has
 246  * fully terminated.
 247  *
 248  * <p>If hook or callback methods throw exceptions, internal worker
 249  * threads may in turn fail and abruptly terminate.</dd>
 250  *
 251  * <dt>Queue maintenance</dt>
 252  *
 253  * <dd>Method {@link #getQueue()} allows access to the work queue
 254  * for purposes of monitoring and debugging.  Use of this method for
 255  * any other purpose is strongly discouraged.  Two supplied methods,
 256  * {@link #remove(Runnable)} and {@link #purge} are available to
 257  * assist in storage reclamation when large numbers of queued tasks
 258  * become cancelled.</dd>
 259  *
 260  * <dt>Finalization</dt>
 261  *
 262  * <dd>A pool that is no longer referenced in a program <em>AND</em>
 263  * has no remaining threads will be {@code shutdown} automatically. If
 264  * you would like to ensure that unreferenced pools are reclaimed even
 265  * if users forget to call {@link #shutdown}, then you must arrange
 266  * that unused threads eventually die, by setting appropriate
 267  * keep-alive times, using a lower bound of zero core threads and/or
 268  * setting {@link #allowCoreThreadTimeOut(boolean)}.  </dd>
 269  *
 270  * </dl>
 271  *
 272  * <p><b>Extension example</b>. Most extensions of this class
 273  * override one or more of the protected hook methods. For example,
 274  * here is a subclass that adds a simple pause/resume feature:
 275  *
 276  *  <pre> {@code
 277  * class PausableThreadPoolExecutor extends ThreadPoolExecutor {
 278  *   private boolean isPaused;
 279  *   private ReentrantLock pauseLock = new ReentrantLock();
 280  *   private Condition unpaused = pauseLock.newCondition();
 281  *
 282  *   public PausableThreadPoolExecutor(...) { super(...); }
 283  *
 284  *   protected void beforeExecute(Thread t, Runnable r) {
 285  *     super.beforeExecute(t, r);
 286  *     pauseLock.lock();
 287  *     try {
 288  *       while (isPaused) unpaused.await();
 289  *     } catch (InterruptedException ie) {
 290  *       t.interrupt();
 291  *     } finally {
 292  *       pauseLock.unlock();
 293  *     }
 294  *   }
 295  *
 296  *   public void pause() {
 297  *     pauseLock.lock();
 298  *     try {
 299  *       isPaused = true;
 300  *     } finally {
 301  *       pauseLock.unlock();
 302  *     }
 303  *   }
 304  *
 305  *   public void resume() {
 306  *     pauseLock.lock();
 307  *     try {
 308  *       isPaused = false;
 309  *       unpaused.signalAll();
 310  *     } finally {
 311  *       pauseLock.unlock();
 312  *     }
 313  *   }
 314  * }}</pre>
 315  *
 316  * @since 1.5
 317  * @author Doug Lea
 318  */
 319 public class ThreadPoolExecutor extends AbstractExecutorService {
 320     /**
 321      * The main pool control state, ctl, is an atomic integer packing
 322      * two conceptual fields
 323      *   workerCount, indicating the effective number of threads
 324      *   runState,    indicating whether running, shutting down etc
 325      *
 326      * In order to pack them into one int, we limit workerCount to
 327      * (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2
 328      * billion) otherwise representable. If this is ever an issue in
 329      * the future, the variable can be changed to be an AtomicLong,
 330      * and the shift/mask constants below adjusted. But until the need
 331      * arises, this code is a bit faster and simpler using an int.
 332      *
 333      * The workerCount is the number of workers that have been
 334      * permitted to start and not permitted to stop.  The value may be
 335      * transiently different from the actual number of live threads,
 336      * for example when a ThreadFactory fails to create a thread when
 337      * asked, and when exiting threads are still performing
 338      * bookkeeping before terminating. The user-visible pool size is
 339      * reported as the current size of the workers set.
 340      *
 341      * The runState provides the main lifecycle control, taking on values:
 342      *
 343      *   RUNNING:  Accept new tasks and process queued tasks
 344      *   SHUTDOWN: Don't accept new tasks, but process queued tasks
 345      *   STOP:     Don't accept new tasks, don't process queued tasks,
 346      *             and interrupt in-progress tasks
 347      *   TIDYING:  All tasks have terminated, workerCount is zero,
 348      *             the thread transitioning to state TIDYING
 349      *             will run the terminated() hook method
 350      *   TERMINATED: terminated() has completed
 351      *
 352      * The numerical order among these values matters, to allow
 353      * ordered comparisons. The runState monotonically increases over
 354      * time, but need not hit each state. The transitions are:
 355      *
 356      * RUNNING -> SHUTDOWN
 357      *    On invocation of shutdown(), perhaps implicitly in finalize()
 358      * (RUNNING or SHUTDOWN) -> STOP
 359      *    On invocation of shutdownNow()
 360      * SHUTDOWN -> TIDYING
 361      *    When both queue and pool are empty
 362      * STOP -> TIDYING
 363      *    When pool is empty
 364      * TIDYING -> TERMINATED
 365      *    When the terminated() hook method has completed
 366      *
 367      * Threads waiting in awaitTermination() will return when the
 368      * state reaches TERMINATED.
 369      *
 370      * Detecting the transition from SHUTDOWN to TIDYING is less
 371      * straightforward than you'd like because the queue may become
 372      * empty after non-empty and vice versa during SHUTDOWN state, but
 373      * we can only terminate if, after seeing that it is empty, we see
 374      * that workerCount is 0 (which sometimes entails a recheck -- see
 375      * below).
 376      */
 377     private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
 378     private static final int COUNT_BITS = Integer.SIZE - 3;
 379     private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
 380 
 381     // runState is stored in the high-order bits
 382     private static final int RUNNING    = -1 << COUNT_BITS;
 383     private static final int SHUTDOWN   =  0 << COUNT_BITS;
 384     private static final int STOP       =  1 << COUNT_BITS;
 385     private static final int TIDYING    =  2 << COUNT_BITS;
 386     private static final int TERMINATED =  3 << COUNT_BITS;
 387 
 388     // Packing and unpacking ctl
 389     private static int runStateOf(int c)     { return c & ~CAPACITY; }
 390     private static int workerCountOf(int c)  { return c & CAPACITY; }
 391     private static int ctlOf(int rs, int wc) { return rs | wc; }
 392 
 393     /*
 394      * Bit field accessors that don't require unpacking ctl.
 395      * These depend on the bit layout and on workerCount being never negative.
 396      */
 397 
 398     private static boolean runStateLessThan(int c, int s) {
 399         return c < s;
 400     }
 401 
 402     private static boolean runStateAtLeast(int c, int s) {
 403         return c >= s;
 404     }
 405 
 406     private static boolean isRunning(int c) {
 407         return c < SHUTDOWN;
 408     }
 409 
 410     /**
 411      * Attempts to CAS-increment the workerCount field of ctl.
 412      */
 413     private boolean compareAndIncrementWorkerCount(int expect) {
 414         return ctl.compareAndSet(expect, expect + 1);
 415     }
 416 
 417     /**
 418      * Attempts to CAS-decrement the workerCount field of ctl.
 419      */
 420     private boolean compareAndDecrementWorkerCount(int expect) {
 421         return ctl.compareAndSet(expect, expect - 1);
 422     }
 423 
 424     /**
 425      * Decrements the workerCount field of ctl. This is called only on
 426      * abrupt termination of a thread (see processWorkerExit). Other
 427      * decrements are performed within getTask.
 428      */
 429     private void decrementWorkerCount() {
 430         do {} while (! compareAndDecrementWorkerCount(ctl.get()));
 431     }
 432 
 433     /**
 434      * The queue used for holding tasks and handing off to worker
 435      * threads.  We do not require that workQueue.poll() returning
 436      * null necessarily means that workQueue.isEmpty(), so rely
 437      * solely on isEmpty to see if the queue is empty (which we must
 438      * do for example when deciding whether to transition from
 439      * SHUTDOWN to TIDYING).  This accommodates special-purpose
 440      * queues such as DelayQueues for which poll() is allowed to
 441      * return null even if it may later return non-null when delays
 442      * expire.
 443      */
 444     private final BlockingQueue<Runnable> workQueue;
 445 
 446     /**
 447      * Lock held on access to workers set and related bookkeeping.
 448      * While we could use a concurrent set of some sort, it turns out
 449      * to be generally preferable to use a lock. Among the reasons is
 450      * that this serializes interruptIdleWorkers, which avoids
 451      * unnecessary interrupt storms, especially during shutdown.
 452      * Otherwise exiting threads would concurrently interrupt those
 453      * that have not yet interrupted. It also simplifies some of the
 454      * associated statistics bookkeeping of largestPoolSize etc. We
 455      * also hold mainLock on shutdown and shutdownNow, for the sake of
 456      * ensuring workers set is stable while separately checking
 457      * permission to interrupt and actually interrupting.
 458      */
 459     private final ReentrantLock mainLock = new ReentrantLock();
 460 
 461     /**
 462      * Set containing all worker threads in pool. Accessed only when
 463      * holding mainLock.
 464      */
 465     private final HashSet<Worker> workers = new HashSet<Worker>();
 466 
 467     /**
 468      * Wait condition to support awaitTermination
 469      */
 470     private final Condition termination = mainLock.newCondition();
 471 
 472     /**
 473      * Tracks largest attained pool size. Accessed only under
 474      * mainLock.
 475      */
 476     private int largestPoolSize;
 477 
 478     /**
 479      * Counter for completed tasks. Updated only on termination of
 480      * worker threads. Accessed only under mainLock.
 481      */
 482     private long completedTaskCount;
 483 
 484     /*
 485      * All user control parameters are declared as volatiles so that
 486      * ongoing actions are based on freshest values, but without need
 487      * for locking, since no internal invariants depend on them
 488      * changing synchronously with respect to other actions.
 489      */
 490 
 491     /**
 492      * Factory for new threads. All threads are created using this
 493      * factory (via method addWorker).  All callers must be prepared
 494      * for addWorker to fail, which may reflect a system or user's
 495      * policy limiting the number of threads.  Even though it is not
 496      * treated as an error, failure to create threads may result in
 497      * new tasks being rejected or existing ones remaining stuck in
 498      * the queue.
 499      *
 500      * We go further and preserve pool invariants even in the face of
 501      * errors such as OutOfMemoryError, that might be thrown while
 502      * trying to create threads.  Such errors are rather common due to
 503      * the need to allocate a native stack in Thread.start, and users
 504      * will want to perform clean pool shutdown to clean up.  There
 505      * will likely be enough memory available for the cleanup code to
 506      * complete without encountering yet another OutOfMemoryError.
 507      */
 508     private volatile ThreadFactory threadFactory;
 509 
 510     /**
 511      * Handler called when saturated or shutdown in execute.
 512      */
 513     private volatile RejectedExecutionHandler handler;
 514 
 515     /**
 516      * Timeout in nanoseconds for idle threads waiting for work.
 517      * Threads use this timeout when there are more than corePoolSize
 518      * present or if allowCoreThreadTimeOut. Otherwise they wait
 519      * forever for new work.
 520      */
 521     private volatile long keepAliveTime;
 522 
 523     /**
 524      * If false (default), core threads stay alive even when idle.
 525      * If true, core threads use keepAliveTime to time out waiting
 526      * for work.
 527      */
 528     private volatile boolean allowCoreThreadTimeOut;
 529 
 530     /**
 531      * Core pool size is the minimum number of workers to keep alive
 532      * (and not allow to time out etc) unless allowCoreThreadTimeOut
 533      * is set, in which case the minimum is zero.
 534      */
 535     private volatile int corePoolSize;
 536 
 537     /**
 538      * Maximum pool size. Note that the actual maximum is internally
 539      * bounded by CAPACITY.
 540      */
 541     private volatile int maximumPoolSize;
 542 
 543     /**
 544      * The default rejected execution handler
 545      */
 546     private static final RejectedExecutionHandler defaultHandler =
 547         new AbortPolicy();
 548 
 549     /**
 550      * Permission required for callers of shutdown and shutdownNow.
 551      * We additionally require (see checkShutdownAccess) that callers
 552      * have permission to actually interrupt threads in the worker set
 553      * (as governed by Thread.interrupt, which relies on
 554      * ThreadGroup.checkAccess, which in turn relies on
 555      * SecurityManager.checkAccess). Shutdowns are attempted only if
 556      * these checks pass.
 557      *
 558      * All actual invocations of Thread.interrupt (see
 559      * interruptIdleWorkers and interruptWorkers) ignore
 560      * SecurityExceptions, meaning that the attempted interrupts
 561      * silently fail. In the case of shutdown, they should not fail
 562      * unless the SecurityManager has inconsistent policies, sometimes
 563      * allowing access to a thread and sometimes not. In such cases,
 564      * failure to actually interrupt threads may disable or delay full
 565      * termination. Other uses of interruptIdleWorkers are advisory,
 566      * and failure to actually interrupt will merely delay response to
 567      * configuration changes so is not handled exceptionally.
 568      */
 569     private static final RuntimePermission shutdownPerm =
 570         new RuntimePermission("modifyThread");
 571 
 572     /**
 573      * Class Worker mainly maintains interrupt control state for
 574      * threads running tasks, along with other minor bookkeeping.
 575      * This class opportunistically extends AbstractQueuedSynchronizer
 576      * to simplify acquiring and releasing a lock surrounding each
 577      * task execution.  This protects against interrupts that are
 578      * intended to wake up a worker thread waiting for a task from
 579      * instead interrupting a task being run.  We implement a simple
 580      * non-reentrant mutual exclusion lock rather than use
 581      * ReentrantLock because we do not want worker tasks to be able to
 582      * reacquire the lock when they invoke pool control methods like
 583      * setCorePoolSize.  Additionally, to suppress interrupts until
 584      * the thread actually starts running tasks, we initialize lock
 585      * state to a negative value, and clear it upon start (in
 586      * runWorker).
 587      */
 588     private final class Worker
 589         extends AbstractQueuedSynchronizer
 590         implements Runnable
 591     {
 592         /**
 593          * This class will never be serialized, but we provide a
 594          * serialVersionUID to suppress a javac warning.
 595          */
 596         private static final long serialVersionUID = 6138294804551838833L;
 597 
 598         /** Thread this worker is running in.  Null if factory fails. */
 599         final Thread thread;
 600         /** Initial task to run.  Possibly null. */
 601         Runnable firstTask;
 602         /** Per-thread task counter */
 603         volatile long completedTasks;
 604 
 605         /**
 606          * Creates with given first task and thread from ThreadFactory.
 607          * @param firstTask the first task (null if none)
 608          */
 609         Worker(Runnable firstTask) {
 610             setState(-1); // inhibit interrupts until runWorker
 611             this.firstTask = firstTask;
 612             this.thread = getThreadFactory().newThread(this);
 613         }
 614 
 615         /** Delegates main run loop to outer runWorker  */
 616         public void run() {
 617             runWorker(this);
 618         }
 619 
 620         // Lock methods
 621         //
 622         // The value 0 represents the unlocked state.
 623         // The value 1 represents the locked state.
 624 
 625         protected boolean isHeldExclusively() {
 626             return getState() != 0;
 627         }
 628 
 629         protected boolean tryAcquire(int unused) {
 630             if (compareAndSetState(0, 1)) {
 631                 setExclusiveOwnerThread(Thread.currentThread());
 632                 return true;
 633             }
 634             return false;
 635         }
 636 
 637         protected boolean tryRelease(int unused) {
 638             setExclusiveOwnerThread(null);
 639             setState(0);
 640             return true;
 641         }
 642 
 643         public void lock()        { acquire(1); }
 644         public boolean tryLock()  { return tryAcquire(1); }
 645         public void unlock()      { release(1); }
 646         public boolean isLocked() { return isHeldExclusively(); }
 647 
 648         void interruptIfStarted() {
 649             Thread t;
 650             if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
 651                 try {
 652                     t.interrupt();
 653                 } catch (SecurityException ignore) {
 654                 }
 655             }
 656         }
 657     }
 658 
 659     /*
 660      * Methods for setting control state
 661      */
 662 
 663     /**
 664      * Transitions runState to given target, or leaves it alone if
 665      * already at least the given target.
 666      *
 667      * @param targetState the desired state, either SHUTDOWN or STOP
 668      *        (but not TIDYING or TERMINATED -- use tryTerminate for that)
 669      */
 670     private void advanceRunState(int targetState) {
 671         for (;;) {
 672             int c = ctl.get();
 673             if (runStateAtLeast(c, targetState) ||
 674                 ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
 675                 break;
 676         }
 677     }
 678 
 679     /**
 680      * Transitions to TERMINATED state if either (SHUTDOWN and pool
 681      * and queue empty) or (STOP and pool empty).  If otherwise
 682      * eligible to terminate but workerCount is nonzero, interrupts an
 683      * idle worker to ensure that shutdown signals propagate. This
 684      * method must be called following any action that might make
 685      * termination possible -- reducing worker count or removing tasks
 686      * from the queue during shutdown. The method is non-private to
 687      * allow access from ScheduledThreadPoolExecutor.
 688      */
 689     final void tryTerminate() {
 690         for (;;) {
 691             int c = ctl.get();
 692             if (isRunning(c) ||
 693                 runStateAtLeast(c, TIDYING) ||
 694                 (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
 695                 return;
 696             if (workerCountOf(c) != 0) { // Eligible to terminate
 697                 interruptIdleWorkers(ONLY_ONE);
 698                 return;
 699             }
 700 
 701             final ReentrantLock mainLock = this.mainLock;
 702             mainLock.lock();
 703             try {
 704                 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
 705                     try {
 706                         terminated();
 707                     } finally {
 708                         ctl.set(ctlOf(TERMINATED, 0));
 709                         termination.signalAll();
 710                     }
 711                     return;
 712                 }
 713             } finally {
 714                 mainLock.unlock();
 715             }
 716             // else retry on failed CAS
 717         }
 718     }
 719 
 720     /*
 721      * Methods for controlling interrupts to worker threads.
 722      */
 723 
 724     /**
 725      * If there is a security manager, makes sure caller has
 726      * permission to shut down threads in general (see shutdownPerm).
 727      * If this passes, additionally makes sure the caller is allowed
 728      * to interrupt each worker thread. This might not be true even if
 729      * first check passed, if the SecurityManager treats some threads
 730      * specially.
 731      */
 732     private void checkShutdownAccess() {
 733         SecurityManager security = System.getSecurityManager();
 734         if (security != null) {
 735             security.checkPermission(shutdownPerm);
 736             final ReentrantLock mainLock = this.mainLock;
 737             mainLock.lock();
 738             try {
 739                 for (Worker w : workers)
 740                     security.checkAccess(w.thread);
 741             } finally {
 742                 mainLock.unlock();
 743             }
 744         }
 745     }
 746 
 747     /**
 748      * Interrupts all threads, even if active. Ignores SecurityExceptions
 749      * (in which case some threads may remain uninterrupted).
 750      */
 751     private void interruptWorkers() {
 752         final ReentrantLock mainLock = this.mainLock;
 753         mainLock.lock();
 754         try {
 755             for (Worker w : workers)
 756                 w.interruptIfStarted();
 757         } finally {
 758             mainLock.unlock();
 759         }
 760     }
 761 
 762     /**
 763      * Interrupts threads that might be waiting for tasks (as
 764      * indicated by not being locked) so they can check for
 765      * termination or configuration changes. Ignores
 766      * SecurityExceptions (in which case some threads may remain
 767      * uninterrupted).
 768      *
 769      * @param onlyOne If true, interrupt at most one worker. This is
 770      * called only from tryTerminate when termination is otherwise
 771      * enabled but there are still other workers.  In this case, at
 772      * most one waiting worker is interrupted to propagate shutdown
 773      * signals in case all threads are currently waiting.
 774      * Interrupting any arbitrary thread ensures that newly arriving
 775      * workers since shutdown began will also eventually exit.
 776      * To guarantee eventual termination, it suffices to always
 777      * interrupt only one idle worker, but shutdown() interrupts all
 778      * idle workers so that redundant workers exit promptly, not
 779      * waiting for a straggler task to finish.
 780      */
 781     private void interruptIdleWorkers(boolean onlyOne) {
 782         final ReentrantLock mainLock = this.mainLock;
 783         mainLock.lock();
 784         try {
 785             for (Worker w : workers) {
 786                 Thread t = w.thread;
 787                 if (!t.isInterrupted() && w.tryLock()) {
 788                     try {
 789                         t.interrupt();
 790                     } catch (SecurityException ignore) {
 791                     } finally {
 792                         w.unlock();
 793                     }
 794                 }
 795                 if (onlyOne)
 796                     break;
 797             }
 798         } finally {
 799             mainLock.unlock();
 800         }
 801     }
 802 
 803     /**
 804      * Common form of interruptIdleWorkers, to avoid having to
 805      * remember what the boolean argument means.
 806      */
 807     private void interruptIdleWorkers() {
 808         interruptIdleWorkers(false);
 809     }
 810 
 811     private static final boolean ONLY_ONE = true;
 812 
 813     /*
 814      * Misc utilities, most of which are also exported to
 815      * ScheduledThreadPoolExecutor
 816      */
 817 
 818     /**
 819      * Invokes the rejected execution handler for the given command.
 820      * Package-protected for use by ScheduledThreadPoolExecutor.
 821      */
 822     final void reject(Runnable command) {
 823         handler.rejectedExecution(command, this);
 824     }
 825 
 826     /**
 827      * Performs any further cleanup following run state transition on
 828      * invocation of shutdown.  A no-op here, but used by
 829      * ScheduledThreadPoolExecutor to cancel delayed tasks.
 830      */
 831     void onShutdown() {
 832     }
 833 
 834     /**
 835      * State check needed by ScheduledThreadPoolExecutor to
 836      * enable running tasks during shutdown.
 837      *
 838      * @param shutdownOK true if should return true if SHUTDOWN
 839      */
 840     final boolean isRunningOrShutdown(boolean shutdownOK) {
 841         int rs = runStateOf(ctl.get());
 842         return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
 843     }
 844 
 845     /**
 846      * Drains the task queue into a new list, normally using
 847      * drainTo. But if the queue is a DelayQueue or any other kind of
 848      * queue for which poll or drainTo may fail to remove some
 849      * elements, it deletes them one by one.
 850      */
 851     private List<Runnable> drainQueue() {
 852         BlockingQueue<Runnable> q = workQueue;
 853         ArrayList<Runnable> taskList = new ArrayList<Runnable>();
 854         q.drainTo(taskList);
 855         if (!q.isEmpty()) {
 856             for (Runnable r : q.toArray(new Runnable[0])) {
 857                 if (q.remove(r))
 858                     taskList.add(r);
 859             }
 860         }
 861         return taskList;
 862     }
 863 
 864     /*
 865      * Methods for creating, running and cleaning up after workers
 866      */
 867 
 868     /**
 869      * Checks if a new worker can be added with respect to current
 870      * pool state and the given bound (either core or maximum). If so,
 871      * the worker count is adjusted accordingly, and, if possible, a
 872      * new worker is created and started, running firstTask as its
 873      * first task. This method returns false if the pool is stopped or
 874      * eligible to shut down. It also returns false if the thread
 875      * factory fails to create a thread when asked.  If the thread
 876      * creation fails, either due to the thread factory returning
 877      * null, or due to an exception (typically OutOfMemoryError in
 878      * Thread.start()), we roll back cleanly.
 879      *
 880      * @param firstTask the task the new thread should run first (or
 881      * null if none). Workers are created with an initial first task
 882      * (in method execute()) to bypass queuing when there are fewer
 883      * than corePoolSize threads (in which case we always start one),
 884      * or when the queue is full (in which case we must bypass queue).
 885      * Initially idle threads are usually created via
 886      * prestartCoreThread or to replace other dying workers.
 887      *
 888      * @param core if true use corePoolSize as bound, else
 889      * maximumPoolSize. (A boolean indicator is used here rather than a
 890      * value to ensure reads of fresh values after checking other pool
 891      * state).
 892      * @return true if successful
 893      */
 894     private boolean addWorker(Runnable firstTask, boolean core) {
 895         retry:
 896         for (;;) {
 897             int c = ctl.get();
 898             int rs = runStateOf(c);
 899 
 900             // Check if queue empty only if necessary.
 901             if (rs >= SHUTDOWN &&
 902                 ! (rs == SHUTDOWN &&
 903                    firstTask == null &&
 904                    ! workQueue.isEmpty()))
 905                 return false;
 906 
 907             for (;;) {
 908                 int wc = workerCountOf(c);
 909                 if (wc >= CAPACITY ||
 910                     wc >= (core ? corePoolSize : maximumPoolSize))
 911                     return false;
 912                 if (compareAndIncrementWorkerCount(c))
 913                     break retry;
 914                 c = ctl.get();  // Re-read ctl
 915                 if (runStateOf(c) != rs)
 916                     continue retry;
 917                 // else CAS failed due to workerCount change; retry inner loop
 918             }
 919         }
 920 
 921         boolean workerStarted = false;
 922         boolean workerAdded = false;
 923         Worker w = null;
 924         try {
 925             w = new Worker(firstTask);
 926             final Thread t = w.thread;
 927             if (t != null) {
 928                 final ReentrantLock mainLock = this.mainLock;
 929                 mainLock.lock();
 930                 try {
 931                     // Recheck while holding lock.
 932                     // Back out on ThreadFactory failure or if
 933                     // shut down before lock acquired.
 934                     int rs = runStateOf(ctl.get());
 935 
 936                     if (rs < SHUTDOWN ||
 937                         (rs == SHUTDOWN && firstTask == null)) {
 938                         if (t.isAlive()) // precheck that t is startable
 939                             throw new IllegalThreadStateException();
 940                         workers.add(w);
 941                         int s = workers.size();
 942                         if (s > largestPoolSize)
 943                             largestPoolSize = s;
 944                         workerAdded = true;
 945                     }
 946                 } finally {
 947                     mainLock.unlock();
 948                 }
 949                 if (workerAdded) {
 950                     t.start();
 951                     workerStarted = true;
 952                 }
 953             }
 954         } finally {
 955             if (! workerStarted)
 956                 addWorkerFailed(w);
 957         }
 958         return workerStarted;
 959     }
 960 
 961     /**
 962      * Rolls back the worker thread creation.
 963      * - removes worker from workers, if present
 964      * - decrements worker count
 965      * - rechecks for termination, in case the existence of this
 966      *   worker was holding up termination
 967      */
 968     private void addWorkerFailed(Worker w) {
 969         final ReentrantLock mainLock = this.mainLock;
 970         mainLock.lock();
 971         try {
 972             if (w != null)
 973                 workers.remove(w);
 974             decrementWorkerCount();
 975             tryTerminate();
 976         } finally {
 977             mainLock.unlock();
 978         }
 979     }
 980 
 981     /**
 982      * Performs cleanup and bookkeeping for a dying worker. Called
 983      * only from worker threads. Unless completedAbruptly is set,
 984      * assumes that workerCount has already been adjusted to account
 985      * for exit.  This method removes thread from worker set, and
 986      * possibly terminates the pool or replaces the worker if either
 987      * it exited due to user task exception or if fewer than
 988      * corePoolSize workers are running or queue is non-empty but
 989      * there are no workers.
 990      *
 991      * @param w the worker
 992      * @param completedAbruptly if the worker died due to user exception
 993      */
 994     private void processWorkerExit(Worker w, boolean completedAbruptly) {
 995         if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
 996             decrementWorkerCount();
 997 
 998         final ReentrantLock mainLock = this.mainLock;
 999         mainLock.lock();
1000         try {
1001             completedTaskCount += w.completedTasks;
1002             workers.remove(w);
1003         } finally {
1004             mainLock.unlock();
1005         }
1006 
1007         tryTerminate();
1008 
1009         int c = ctl.get();
1010         if (runStateLessThan(c, STOP)) {
1011             if (!completedAbruptly) {
1012                 int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
1013                 if (min == 0 && ! workQueue.isEmpty())
1014                     min = 1;
1015                 if (workerCountOf(c) >= min)
1016                     return; // replacement not needed
1017             }
1018             addWorker(null, false);
1019         }
1020     }
1021 
1022     /**
1023      * Performs blocking or timed wait for a task, depending on
1024      * current configuration settings, or returns null if this worker
1025      * must exit because of any of:
1026      * 1. There are more than maximumPoolSize workers (due to
1027      *    a call to setMaximumPoolSize).
1028      * 2. The pool is stopped.
1029      * 3. The pool is shutdown and the queue is empty.
1030      * 4. This worker timed out waiting for a task, and timed-out
1031      *    workers are subject to termination (that is,
1032      *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
1033      *    both before and after the timed wait, and if the queue is
1034      *    non-empty, this worker is not the last thread in the pool.
1035      *
1036      * @return task, or null if the worker must exit, in which case
1037      *         workerCount is decremented
1038      */
1039     private Runnable getTask() {
1040         boolean timedOut = false; // Did the last poll() time out?
1041 
1042         for (;;) {
1043             int c = ctl.get();
1044             int rs = runStateOf(c);
1045 
1046             // Check if queue empty only if necessary.
1047             if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
1048                 decrementWorkerCount();
1049                 return null;
1050             }
1051 
1052             int wc = workerCountOf(c);
1053 
1054             // Are workers subject to culling?
1055             boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
1056 
1057             if ((wc > maximumPoolSize || (timed && timedOut))
1058                 && (wc > 1 || workQueue.isEmpty())) {
1059                 if (compareAndDecrementWorkerCount(c))
1060                     return null;
1061                 continue;
1062             }
1063 
1064             try {
1065                 Runnable r = timed ?
1066                     workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
1067                     workQueue.take();
1068                 if (r != null)
1069                     return r;
1070                 timedOut = true;
1071             } catch (InterruptedException retry) {
1072                 timedOut = false;
1073             }
1074         }
1075     }
1076 
1077     /**
1078      * Main worker run loop.  Repeatedly gets tasks from queue and
1079      * executes them, while coping with a number of issues:
1080      *
1081      * 1. We may start out with an initial task, in which case we
1082      * don't need to get the first one. Otherwise, as long as pool is
1083      * running, we get tasks from getTask. If it returns null then the
1084      * worker exits due to changed pool state or configuration
1085      * parameters.  Other exits result from exception throws in
1086      * external code, in which case completedAbruptly holds, which
1087      * usually leads processWorkerExit to replace this thread.
1088      *
1089      * 2. Before running any task, the lock is acquired to prevent
1090      * other pool interrupts while the task is executing, and then we
1091      * ensure that unless pool is stopping, this thread does not have
1092      * its interrupt set.
1093      *
1094      * 3. Each task run is preceded by a call to beforeExecute, which
1095      * might throw an exception, in which case we cause thread to die
1096      * (breaking loop with completedAbruptly true) without processing
1097      * the task.
1098      *
1099      * 4. Assuming beforeExecute completes normally, we run the task,
1100      * gathering any of its thrown exceptions to send to afterExecute.
1101      * We separately handle RuntimeException, Error (both of which the
1102      * specs guarantee that we trap) and arbitrary Throwables.
1103      * Because we cannot rethrow Throwables within Runnable.run, we
1104      * wrap them within Errors on the way out (to the thread's
1105      * UncaughtExceptionHandler).  Any thrown exception also
1106      * conservatively causes thread to die.
1107      *
1108      * 5. After task.run completes, we call afterExecute, which may
1109      * also throw an exception, which will also cause thread to
1110      * die. According to JLS Sec 14.20, this exception is the one that
1111      * will be in effect even if task.run throws.
1112      *
1113      * The net effect of the exception mechanics is that afterExecute
1114      * and the thread's UncaughtExceptionHandler have as accurate
1115      * information as we can provide about any problems encountered by
1116      * user code.
1117      *
1118      * @param w the worker
1119      */
1120     final void runWorker(Worker w) {
1121         Thread wt = Thread.currentThread();
1122         Runnable task = w.firstTask;
1123         w.firstTask = null;
1124         w.unlock(); // allow interrupts
1125         boolean completedAbruptly = true;
1126         try {
1127             while (task != null || (task = getTask()) != null) {
1128                 w.lock();
1129                 // If pool is stopping, ensure thread is interrupted;
1130                 // if not, ensure thread is not interrupted.  This
1131                 // requires a recheck in second case to deal with
1132                 // shutdownNow race while clearing interrupt
1133                 if ((runStateAtLeast(ctl.get(), STOP) ||
1134                      (Thread.interrupted() &&
1135                       runStateAtLeast(ctl.get(), STOP))) &&
1136                     !wt.isInterrupted())
1137                     wt.interrupt();
1138                 try {
1139                     beforeExecute(wt, task);
1140                     Throwable thrown = null;
1141                     try {
1142                         task.run();
1143                     } catch (RuntimeException x) {
1144                         thrown = x; throw x;
1145                     } catch (Error x) {
1146                         thrown = x; throw x;
1147                     } catch (Throwable x) {
1148                         thrown = x; throw new Error(x);
1149                     } finally {
1150                         afterExecute(task, thrown);
1151                     }
1152                 } finally {
1153                     task = null;
1154                     w.completedTasks++;
1155                     w.unlock();
1156                 }
1157             }
1158             completedAbruptly = false;
1159         } finally {
1160             processWorkerExit(w, completedAbruptly);
1161         }
1162     }
1163 
1164     // Public constructors and methods
1165 
1166     /**
1167      * Creates a new {@code ThreadPoolExecutor} with the given initial
1168      * parameters and default thread factory and rejected execution handler.
1169      * It may be more convenient to use one of the {@link Executors} factory
1170      * methods instead of this general purpose constructor.
1171      *
1172      * @param corePoolSize the number of threads to keep in the pool, even
1173      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
1174      * @param maximumPoolSize the maximum number of threads to allow in the
1175      *        pool
1176      * @param keepAliveTime when the number of threads is greater than
1177      *        the core, this is the maximum time that excess idle threads
1178      *        will wait for new tasks before terminating.
1179      * @param unit the time unit for the {@code keepAliveTime} argument
1180      * @param workQueue the queue to use for holding tasks before they are
1181      *        executed.  This queue will hold only the {@code Runnable}
1182      *        tasks submitted by the {@code execute} method.
1183      * @throws IllegalArgumentException if one of the following holds:<br>
1184      *         {@code corePoolSize < 0}<br>
1185      *         {@code keepAliveTime < 0}<br>
1186      *         {@code maximumPoolSize <= 0}<br>
1187      *         {@code maximumPoolSize < corePoolSize}
1188      * @throws NullPointerException if {@code workQueue} is null
1189      */
1190     public ThreadPoolExecutor(int corePoolSize,
1191                               int maximumPoolSize,
1192                               long keepAliveTime,
1193                               TimeUnit unit,
1194                               BlockingQueue<Runnable> workQueue) {
1195         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
1196              Executors.defaultThreadFactory(), defaultHandler);
1197     }
1198 
1199     /**
1200      * Creates a new {@code ThreadPoolExecutor} with the given initial
1201      * parameters and default rejected execution handler.
1202      *
1203      * @param corePoolSize the number of threads to keep in the pool, even
1204      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
1205      * @param maximumPoolSize the maximum number of threads to allow in the
1206      *        pool
1207      * @param keepAliveTime when the number of threads is greater than
1208      *        the core, this is the maximum time that excess idle threads
1209      *        will wait for new tasks before terminating.
1210      * @param unit the time unit for the {@code keepAliveTime} argument
1211      * @param workQueue the queue to use for holding tasks before they are
1212      *        executed.  This queue will hold only the {@code Runnable}
1213      *        tasks submitted by the {@code execute} method.
1214      * @param threadFactory the factory to use when the executor
1215      *        creates a new thread
1216      * @throws IllegalArgumentException if one of the following holds:<br>
1217      *         {@code corePoolSize < 0}<br>
1218      *         {@code keepAliveTime < 0}<br>
1219      *         {@code maximumPoolSize <= 0}<br>
1220      *         {@code maximumPoolSize < corePoolSize}
1221      * @throws NullPointerException if {@code workQueue}
1222      *         or {@code threadFactory} is null
1223      */
1224     public ThreadPoolExecutor(int corePoolSize,
1225                               int maximumPoolSize,
1226                               long keepAliveTime,
1227                               TimeUnit unit,
1228                               BlockingQueue<Runnable> workQueue,
1229                               ThreadFactory threadFactory) {
1230         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
1231              threadFactory, defaultHandler);
1232     }
1233 
1234     /**
1235      * Creates a new {@code ThreadPoolExecutor} with the given initial
1236      * parameters and default thread factory.
1237      *
1238      * @param corePoolSize the number of threads to keep in the pool, even
1239      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
1240      * @param maximumPoolSize the maximum number of threads to allow in the
1241      *        pool
1242      * @param keepAliveTime when the number of threads is greater than
1243      *        the core, this is the maximum time that excess idle threads
1244      *        will wait for new tasks before terminating.
1245      * @param unit the time unit for the {@code keepAliveTime} argument
1246      * @param workQueue the queue to use for holding tasks before they are
1247      *        executed.  This queue will hold only the {@code Runnable}
1248      *        tasks submitted by the {@code execute} method.
1249      * @param handler the handler to use when execution is blocked
1250      *        because the thread bounds and queue capacities are reached
1251      * @throws IllegalArgumentException if one of the following holds:<br>
1252      *         {@code corePoolSize < 0}<br>
1253      *         {@code keepAliveTime < 0}<br>
1254      *         {@code maximumPoolSize <= 0}<br>
1255      *         {@code maximumPoolSize < corePoolSize}
1256      * @throws NullPointerException if {@code workQueue}
1257      *         or {@code handler} is null
1258      */
1259     public ThreadPoolExecutor(int corePoolSize,
1260                               int maximumPoolSize,
1261                               long keepAliveTime,
1262                               TimeUnit unit,
1263                               BlockingQueue<Runnable> workQueue,
1264                               RejectedExecutionHandler handler) {
1265         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
1266              Executors.defaultThreadFactory(), handler);
1267     }
1268 
1269     /**
1270      * Creates a new {@code ThreadPoolExecutor} with the given initial
1271      * parameters.
1272      *
1273      * @param corePoolSize the number of threads to keep in the pool, even
1274      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
1275      * @param maximumPoolSize the maximum number of threads to allow in the
1276      *        pool
1277      * @param keepAliveTime when the number of threads is greater than
1278      *        the core, this is the maximum time that excess idle threads
1279      *        will wait for new tasks before terminating.
1280      * @param unit the time unit for the {@code keepAliveTime} argument
1281      * @param workQueue the queue to use for holding tasks before they are
1282      *        executed.  This queue will hold only the {@code Runnable}
1283      *        tasks submitted by the {@code execute} method.
1284      * @param threadFactory the factory to use when the executor
1285      *        creates a new thread
1286      * @param handler the handler to use when execution is blocked
1287      *        because the thread bounds and queue capacities are reached
1288      * @throws IllegalArgumentException if one of the following holds:<br>
1289      *         {@code corePoolSize < 0}<br>
1290      *         {@code keepAliveTime < 0}<br>
1291      *         {@code maximumPoolSize <= 0}<br>
1292      *         {@code maximumPoolSize < corePoolSize}
1293      * @throws NullPointerException if {@code workQueue}
1294      *         or {@code threadFactory} or {@code handler} is null
1295      */
1296     public ThreadPoolExecutor(int corePoolSize,
1297                               int maximumPoolSize,
1298                               long keepAliveTime,
1299                               TimeUnit unit,
1300                               BlockingQueue<Runnable> workQueue,
1301                               ThreadFactory threadFactory,
1302                               RejectedExecutionHandler handler) {
1303         if (corePoolSize < 0 ||
1304             maximumPoolSize <= 0 ||
1305             maximumPoolSize < corePoolSize ||
1306             keepAliveTime < 0)
1307             throw new IllegalArgumentException();
1308         if (workQueue == null || threadFactory == null || handler == null)
1309             throw new NullPointerException();
1310         this.corePoolSize = corePoolSize;
1311         this.maximumPoolSize = maximumPoolSize;
1312         this.workQueue = workQueue;
1313         this.keepAliveTime = unit.toNanos(keepAliveTime);
1314         this.threadFactory = threadFactory;
1315         this.handler = handler;
1316     }
1317 
1318     /**
1319      * Executes the given task sometime in the future.  The task
1320      * may execute in a new thread or in an existing pooled thread.
1321      *
1322      * If the task cannot be submitted for execution, either because this
1323      * executor has been shutdown or because its capacity has been reached,
1324      * the task is handled by the current {@code RejectedExecutionHandler}.
1325      *
1326      * @param command the task to execute
1327      * @throws RejectedExecutionException at discretion of
1328      *         {@code RejectedExecutionHandler}, if the task
1329      *         cannot be accepted for execution
1330      * @throws NullPointerException if {@code command} is null
1331      */
1332     public void execute(Runnable command) {
1333         if (command == null)
1334             throw new NullPointerException();
1335         /*
1336          * Proceed in 3 steps:
1337          *
1338          * 1. If fewer than corePoolSize threads are running, try to
1339          * start a new thread with the given command as its first
1340          * task.  The call to addWorker atomically checks runState and
1341          * workerCount, and so prevents false alarms that would add
1342          * threads when it shouldn't, by returning false.
1343          *
1344          * 2. If a task can be successfully queued, then we still need
1345          * to double-check whether we should have added a thread
1346          * (because existing ones died since last checking) or that
1347          * the pool shut down since entry into this method. So we
1348          * recheck state and if necessary roll back the enqueuing if
1349          * stopped, or start a new thread if there are none.
1350          *
1351          * 3. If we cannot queue task, then we try to add a new
1352          * thread.  If it fails, we know we are shut down or saturated
1353          * and so reject the task.
1354          */
1355         int c = ctl.get();
1356         if (workerCountOf(c) < corePoolSize) {
1357             if (addWorker(command, true))
1358                 return;
1359             c = ctl.get();
1360         }
1361         if (isRunning(c) && workQueue.offer(command)) {
1362             int recheck = ctl.get();
1363             if (! isRunning(recheck) && remove(command))
1364                 reject(command);
1365             else if (workerCountOf(recheck) == 0)
1366                 addWorker(null, false);
1367         }
1368         else if (!addWorker(command, false))
1369             reject(command);
1370     }
1371 
1372     /**
1373      * Initiates an orderly shutdown in which previously submitted
1374      * tasks are executed, but no new tasks will be accepted.
1375      * Invocation has no additional effect if already shut down.
1376      *
1377      * <p>This method does not wait for previously submitted tasks to
1378      * complete execution.  Use {@link #awaitTermination awaitTermination}
1379      * to do that.
1380      *
1381      * @throws SecurityException {@inheritDoc}
1382      */
1383     public void shutdown() {
1384         final ReentrantLock mainLock = this.mainLock;
1385         mainLock.lock();
1386         try {
1387             checkShutdownAccess();
1388             advanceRunState(SHUTDOWN);
1389             interruptIdleWorkers();
1390             onShutdown(); // hook for ScheduledThreadPoolExecutor
1391         } finally {
1392             mainLock.unlock();
1393         }
1394         tryTerminate();
1395     }
1396 
1397     /**
1398      * Attempts to stop all actively executing tasks, halts the
1399      * processing of waiting tasks, and returns a list of the tasks
1400      * that were awaiting execution. These tasks are drained (removed)
1401      * from the task queue upon return from this method.
1402      *
1403      * <p>This method does not wait for actively executing tasks to
1404      * terminate.  Use {@link #awaitTermination awaitTermination} to
1405      * do that.
1406      *
1407      * <p>There are no guarantees beyond best-effort attempts to stop
1408      * processing actively executing tasks.  This implementation
1409      * cancels tasks via {@link Thread#interrupt}, so any task that
1410      * fails to respond to interrupts may never terminate.
1411      *
1412      * @throws SecurityException {@inheritDoc}
1413      */
1414     public List<Runnable> shutdownNow() {
1415         List<Runnable> tasks;
1416         final ReentrantLock mainLock = this.mainLock;
1417         mainLock.lock();
1418         try {
1419             checkShutdownAccess();
1420             advanceRunState(STOP);
1421             interruptWorkers();
1422             tasks = drainQueue();
1423         } finally {
1424             mainLock.unlock();
1425         }
1426         tryTerminate();
1427         return tasks;
1428     }
1429 
1430     public boolean isShutdown() {
1431         return ! isRunning(ctl.get());
1432     }
1433 
1434     /**
1435      * Returns true if this executor is in the process of terminating
1436      * after {@link #shutdown} or {@link #shutdownNow} but has not
1437      * completely terminated.  This method may be useful for
1438      * debugging. A return of {@code true} reported a sufficient
1439      * period after shutdown may indicate that submitted tasks have
1440      * ignored or suppressed interruption, causing this executor not
1441      * to properly terminate.
1442      *
1443      * @return {@code true} if terminating but not yet terminated
1444      */
1445     public boolean isTerminating() {
1446         int c = ctl.get();
1447         return ! isRunning(c) && runStateLessThan(c, TERMINATED);
1448     }
1449 
1450     public boolean isTerminated() {
1451         return runStateAtLeast(ctl.get(), TERMINATED);
1452     }
1453 
1454     public boolean awaitTermination(long timeout, TimeUnit unit)
1455         throws InterruptedException {
1456         long nanos = unit.toNanos(timeout);
1457         final ReentrantLock mainLock = this.mainLock;
1458         mainLock.lock();
1459         try {
1460             for (;;) {
1461                 if (runStateAtLeast(ctl.get(), TERMINATED))
1462                     return true;
1463                 if (nanos <= 0)
1464                     return false;
1465                 nanos = termination.awaitNanos(nanos);
1466             }
1467         } finally {
1468             mainLock.unlock();
1469         }
1470     }
1471 
1472     /**
1473      * Invokes {@code shutdown} when this executor is no longer
1474      * referenced and it has no threads.
1475      */
1476     protected void finalize() {
1477         shutdown();
1478     }
1479 
1480     /**
1481      * Sets the thread factory used to create new threads.
1482      *
1483      * @param threadFactory the new thread factory
1484      * @throws NullPointerException if threadFactory is null
1485      * @see #getThreadFactory
1486      */
1487     public void setThreadFactory(ThreadFactory threadFactory) {
1488         if (threadFactory == null)
1489             throw new NullPointerException();
1490         this.threadFactory = threadFactory;
1491     }
1492 
1493     /**
1494      * Returns the thread factory used to create new threads.
1495      *
1496      * @return the current thread factory
1497      * @see #setThreadFactory(ThreadFactory)
1498      */
1499     public ThreadFactory getThreadFactory() {
1500         return threadFactory;
1501     }
1502 
1503     /**
1504      * Sets a new handler for unexecutable tasks.
1505      *
1506      * @param handler the new handler
1507      * @throws NullPointerException if handler is null
1508      * @see #getRejectedExecutionHandler
1509      */
1510     public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
1511         if (handler == null)
1512             throw new NullPointerException();
1513         this.handler = handler;
1514     }
1515 
1516     /**
1517      * Returns the current handler for unexecutable tasks.
1518      *
1519      * @return the current handler
1520      * @see #setRejectedExecutionHandler(RejectedExecutionHandler)
1521      */
1522     public RejectedExecutionHandler getRejectedExecutionHandler() {
1523         return handler;
1524     }
1525 
1526     /**
1527      * Sets the core number of threads.  This overrides any value set
1528      * in the constructor.  If the new value is smaller than the
1529      * current value, excess existing threads will be terminated when
1530      * they next become idle.  If larger, new threads will, if needed,
1531      * be started to execute any queued tasks.
1532      *
1533      * @param corePoolSize the new core size
1534      * @throws IllegalArgumentException if {@code corePoolSize < 0}
1535      * @see #getCorePoolSize
1536      */
1537     public void setCorePoolSize(int corePoolSize) {
1538         if (corePoolSize < 0)
1539             throw new IllegalArgumentException();
1540         int delta = corePoolSize - this.corePoolSize;
1541         this.corePoolSize = corePoolSize;
1542         if (workerCountOf(ctl.get()) > corePoolSize)
1543             interruptIdleWorkers();
1544         else if (delta > 0) {
1545             // We don't really know how many new threads are "needed".
1546             // As a heuristic, prestart enough new workers (up to new
1547             // core size) to handle the current number of tasks in
1548             // queue, but stop if queue becomes empty while doing so.
1549             int k = Math.min(delta, workQueue.size());
1550             while (k-- > 0 && addWorker(null, true)) {
1551                 if (workQueue.isEmpty())
1552                     break;
1553             }
1554         }
1555     }
1556 
1557     /**
1558      * Returns the core number of threads.
1559      *
1560      * @return the core number of threads
1561      * @see #setCorePoolSize
1562      */
1563     public int getCorePoolSize() {
1564         return corePoolSize;
1565     }
1566 
1567     /**
1568      * Starts a core thread, causing it to idly wait for work. This
1569      * overrides the default policy of starting core threads only when
1570      * new tasks are executed. This method will return {@code false}
1571      * if all core threads have already been started.
1572      *
1573      * @return {@code true} if a thread was started
1574      */
1575     public boolean prestartCoreThread() {
1576         return workerCountOf(ctl.get()) < corePoolSize &&
1577             addWorker(null, true);
1578     }
1579 
1580     /**
1581      * Same as prestartCoreThread except arranges that at least one
1582      * thread is started even if corePoolSize is 0.
1583      */
1584     void ensurePrestart() {
1585         int wc = workerCountOf(ctl.get());
1586         if (wc < corePoolSize)
1587             addWorker(null, true);
1588         else if (wc == 0)
1589             addWorker(null, false);
1590     }
1591 
1592     /**
1593      * Starts all core threads, causing them to idly wait for work. This
1594      * overrides the default policy of starting core threads only when
1595      * new tasks are executed.
1596      *
1597      * @return the number of threads started
1598      */
1599     public int prestartAllCoreThreads() {
1600         int n = 0;
1601         while (addWorker(null, true))
1602             ++n;
1603         return n;
1604     }
1605 
1606     /**
1607      * Returns true if this pool allows core threads to time out and
1608      * terminate if no tasks arrive within the keepAlive time, being
1609      * replaced if needed when new tasks arrive. When true, the same
1610      * keep-alive policy applying to non-core threads applies also to
1611      * core threads. When false (the default), core threads are never
1612      * terminated due to lack of incoming tasks.
1613      *
1614      * @return {@code true} if core threads are allowed to time out,
1615      *         else {@code false}
1616      *
1617      * @since 1.6
1618      */
1619     public boolean allowsCoreThreadTimeOut() {
1620         return allowCoreThreadTimeOut;
1621     }
1622 
1623     /**
1624      * Sets the policy governing whether core threads may time out and
1625      * terminate if no tasks arrive within the keep-alive time, being
1626      * replaced if needed when new tasks arrive. When false, core
1627      * threads are never terminated due to lack of incoming
1628      * tasks. When true, the same keep-alive policy applying to
1629      * non-core threads applies also to core threads. To avoid
1630      * continual thread replacement, the keep-alive time must be
1631      * greater than zero when setting {@code true}. This method
1632      * should in general be called before the pool is actively used.
1633      *
1634      * @param value {@code true} if should time out, else {@code false}
1635      * @throws IllegalArgumentException if value is {@code true}
1636      *         and the current keep-alive time is not greater than zero
1637      *
1638      * @since 1.6
1639      */
1640     public void allowCoreThreadTimeOut(boolean value) {
1641         if (value && keepAliveTime <= 0)
1642             throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
1643         if (value != allowCoreThreadTimeOut) {
1644             allowCoreThreadTimeOut = value;
1645             if (value)
1646                 interruptIdleWorkers();
1647         }
1648     }
1649 
1650     /**
1651      * Sets the maximum allowed number of threads. This overrides any
1652      * value set in the constructor. If the new value is smaller than
1653      * the current value, excess existing threads will be
1654      * terminated when they next become idle.
1655      *
1656      * @param maximumPoolSize the new maximum
1657      * @throws IllegalArgumentException if the new maximum is
1658      *         less than or equal to zero, or
1659      *         less than the {@linkplain #getCorePoolSize core pool size}
1660      * @see #getMaximumPoolSize
1661      */
1662     public void setMaximumPoolSize(int maximumPoolSize) {
1663         if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
1664             throw new IllegalArgumentException();
1665         this.maximumPoolSize = maximumPoolSize;
1666         if (workerCountOf(ctl.get()) > maximumPoolSize)
1667             interruptIdleWorkers();
1668     }
1669 
1670     /**
1671      * Returns the maximum allowed number of threads.
1672      *
1673      * @return the maximum allowed number of threads
1674      * @see #setMaximumPoolSize
1675      */
1676     public int getMaximumPoolSize() {
1677         return maximumPoolSize;
1678     }
1679 
1680     /**
1681      * Sets the time limit for which threads may remain idle before
1682      * being terminated.  If there are more than the core number of
1683      * threads currently in the pool, after waiting this amount of
1684      * time without processing a task, excess threads will be
1685      * terminated.  This overrides any value set in the constructor.
1686      *
1687      * @param time the time to wait.  A time value of zero will cause
1688      *        excess threads to terminate immediately after executing tasks.
1689      * @param unit the time unit of the {@code time} argument
1690      * @throws IllegalArgumentException if {@code time} less than zero or
1691      *         if {@code time} is zero and {@code allowsCoreThreadTimeOut}
1692      * @see #getKeepAliveTime(TimeUnit)
1693      */
1694     public void setKeepAliveTime(long time, TimeUnit unit) {
1695         if (time < 0)
1696             throw new IllegalArgumentException();
1697         if (time == 0 && allowsCoreThreadTimeOut())
1698             throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
1699         long keepAliveTime = unit.toNanos(time);
1700         long delta = keepAliveTime - this.keepAliveTime;
1701         this.keepAliveTime = keepAliveTime;
1702         if (delta < 0)
1703             interruptIdleWorkers();
1704     }
1705 
1706     /**
1707      * Returns the thread keep-alive time, which is the amount of time
1708      * that threads in excess of the core pool size may remain
1709      * idle before being terminated.
1710      *
1711      * @param unit the desired time unit of the result
1712      * @return the time limit
1713      * @see #setKeepAliveTime(long, TimeUnit)
1714      */
1715     public long getKeepAliveTime(TimeUnit unit) {
1716         return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
1717     }
1718 
1719     /* User-level queue utilities */
1720 
1721     /**
1722      * Returns the task queue used by this executor. Access to the
1723      * task queue is intended primarily for debugging and monitoring.
1724      * This queue may be in active use.  Retrieving the task queue
1725      * does not prevent queued tasks from executing.
1726      *
1727      * @return the task queue
1728      */
1729     public BlockingQueue<Runnable> getQueue() {
1730         return workQueue;
1731     }
1732 
1733     /**
1734      * Removes this task from the executor's internal queue if it is
1735      * present, thus causing it not to be run if it has not already
1736      * started.
1737      *
1738      * <p>This method may be useful as one part of a cancellation
1739      * scheme.  It may fail to remove tasks that have been converted
1740      * into other forms before being placed on the internal queue. For
1741      * example, a task entered using {@code submit} might be
1742      * converted into a form that maintains {@code Future} status.
1743      * However, in such cases, method {@link #purge} may be used to
1744      * remove those Futures that have been cancelled.
1745      *
1746      * @param task the task to remove
1747      * @return {@code true} if the task was removed
1748      */
1749     public boolean remove(Runnable task) {
1750         boolean removed = workQueue.remove(task);
1751         tryTerminate(); // In case SHUTDOWN and now empty
1752         return removed;
1753     }
1754 
1755     /**
1756      * Tries to remove from the work queue all {@link Future}
1757      * tasks that have been cancelled. This method can be useful as a
1758      * storage reclamation operation, that has no other impact on
1759      * functionality. Cancelled tasks are never executed, but may
1760      * accumulate in work queues until worker threads can actively
1761      * remove them. Invoking this method instead tries to remove them now.
1762      * However, this method may fail to remove tasks in
1763      * the presence of interference by other threads.
1764      */
1765     public void purge() {
1766         final BlockingQueue<Runnable> q = workQueue;
1767         try {
1768             Iterator<Runnable> it = q.iterator();
1769             while (it.hasNext()) {
1770                 Runnable r = it.next();
1771                 if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
1772                     it.remove();
1773             }
1774         } catch (ConcurrentModificationException fallThrough) {
1775             // Take slow path if we encounter interference during traversal.
1776             // Make copy for traversal and call remove for cancelled entries.
1777             // The slow path is more likely to be O(N*N).
1778             for (Object r : q.toArray())
1779                 if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
1780                     q.remove(r);
1781         }
1782 
1783         tryTerminate(); // In case SHUTDOWN and now empty
1784     }
1785 
1786     /* Statistics */
1787 
1788     /**
1789      * Returns the current number of threads in the pool.
1790      *
1791      * @return the number of threads
1792      */
1793     public int getPoolSize() {
1794         final ReentrantLock mainLock = this.mainLock;
1795         mainLock.lock();
1796         try {
1797             // Remove rare and surprising possibility of
1798             // isTerminated() && getPoolSize() > 0
1799             return runStateAtLeast(ctl.get(), TIDYING) ? 0
1800                 : workers.size();
1801         } finally {
1802             mainLock.unlock();
1803         }
1804     }
1805 
1806     /**
1807      * Returns the approximate number of threads that are actively
1808      * executing tasks.
1809      *
1810      * @return the number of threads
1811      */
1812     public int getActiveCount() {
1813         final ReentrantLock mainLock = this.mainLock;
1814         mainLock.lock();
1815         try {
1816             int n = 0;
1817             for (Worker w : workers)
1818                 if (w.isLocked())
1819                     ++n;
1820             return n;
1821         } finally {
1822             mainLock.unlock();
1823         }
1824     }
1825 
1826     /**
1827      * Returns the largest number of threads that have ever
1828      * simultaneously been in the pool.
1829      *
1830      * @return the number of threads
1831      */
1832     public int getLargestPoolSize() {
1833         final ReentrantLock mainLock = this.mainLock;
1834         mainLock.lock();
1835         try {
1836             return largestPoolSize;
1837         } finally {
1838             mainLock.unlock();
1839         }
1840     }
1841 
1842     /**
1843      * Returns the approximate total number of tasks that have ever been
1844      * scheduled for execution. Because the states of tasks and
1845      * threads may change dynamically during computation, the returned
1846      * value is only an approximation.
1847      *
1848      * @return the number of tasks
1849      */
1850     public long getTaskCount() {
1851         final ReentrantLock mainLock = this.mainLock;
1852         mainLock.lock();
1853         try {
1854             long n = completedTaskCount;
1855             for (Worker w : workers) {
1856                 n += w.completedTasks;
1857                 if (w.isLocked())
1858                     ++n;
1859             }
1860             return n + workQueue.size();
1861         } finally {
1862             mainLock.unlock();
1863         }
1864     }
1865 
1866     /**
1867      * Returns the approximate total number of tasks that have
1868      * completed execution. Because the states of tasks and threads
1869      * may change dynamically during computation, the returned value
1870      * is only an approximation, but one that does not ever decrease
1871      * across successive calls.
1872      *
1873      * @return the number of tasks
1874      */
1875     public long getCompletedTaskCount() {
1876         final ReentrantLock mainLock = this.mainLock;
1877         mainLock.lock();
1878         try {
1879             long n = completedTaskCount;
1880             for (Worker w : workers)
1881                 n += w.completedTasks;
1882             return n;
1883         } finally {
1884             mainLock.unlock();
1885         }
1886     }
1887 
1888     /**
1889      * Returns a string identifying this pool, as well as its state,
1890      * including indications of run state and estimated worker and
1891      * task counts.
1892      *
1893      * @return a string identifying this pool, as well as its state
1894      */
1895     public String toString() {
1896         long ncompleted;
1897         int nworkers, nactive;
1898         final ReentrantLock mainLock = this.mainLock;
1899         mainLock.lock();
1900         try {
1901             ncompleted = completedTaskCount;
1902             nactive = 0;
1903             nworkers = workers.size();
1904             for (Worker w : workers) {
1905                 ncompleted += w.completedTasks;
1906                 if (w.isLocked())
1907                     ++nactive;
1908             }
1909         } finally {
1910             mainLock.unlock();
1911         }
1912         int c = ctl.get();
1913         String rs = (runStateLessThan(c, SHUTDOWN) ? "Running" :
1914                      (runStateAtLeast(c, TERMINATED) ? "Terminated" :
1915                       "Shutting down"));
1916         return super.toString() +
1917             "[" + rs +
1918             ", pool size = " + nworkers +
1919             ", active threads = " + nactive +
1920             ", queued tasks = " + workQueue.size() +
1921             ", completed tasks = " + ncompleted +
1922             "]";
1923     }
1924 
1925     /* Extension hooks */
1926 
1927     /**
1928      * Method invoked prior to executing the given Runnable in the
1929      * given thread.  This method is invoked by thread {@code t} that
1930      * will execute task {@code r}, and may be used to re-initialize
1931      * ThreadLocals, or to perform logging.
1932      *
1933      * <p>This implementation does nothing, but may be customized in
1934      * subclasses. Note: To properly nest multiple overridings, subclasses
1935      * should generally invoke {@code super.beforeExecute} at the end of
1936      * this method.
1937      *
1938      * @param t the thread that will run task {@code r}
1939      * @param r the task that will be executed
1940      */
1941     protected void beforeExecute(Thread t, Runnable r) { }
1942 
1943     /**
1944      * Method invoked upon completion of execution of the given Runnable.
1945      * This method is invoked by the thread that executed the task. If
1946      * non-null, the Throwable is the uncaught {@code RuntimeException}
1947      * or {@code Error} that caused execution to terminate abruptly.
1948      *
1949      * <p>This implementation does nothing, but may be customized in
1950      * subclasses. Note: To properly nest multiple overridings, subclasses
1951      * should generally invoke {@code super.afterExecute} at the
1952      * beginning of this method.
1953      *
1954      * <p><b>Note:</b> When actions are enclosed in tasks (such as
1955      * {@link FutureTask}) either explicitly or via methods such as
1956      * {@code submit}, these task objects catch and maintain
1957      * computational exceptions, and so they do not cause abrupt
1958      * termination, and the internal exceptions are <em>not</em>
1959      * passed to this method. If you would like to trap both kinds of
1960      * failures in this method, you can further probe for such cases,
1961      * as in this sample subclass that prints either the direct cause
1962      * or the underlying exception if a task has been aborted:
1963      *
1964      *  <pre> {@code
1965      * class ExtendedExecutor extends ThreadPoolExecutor {
1966      *   // ...
1967      *   protected void afterExecute(Runnable r, Throwable t) {
1968      *     super.afterExecute(r, t);
1969      *     if (t == null && r instanceof Future<?>) {
1970      *       try {
1971      *         Object result = ((Future<?>) r).get();
1972      *       } catch (CancellationException ce) {
1973      *           t = ce;
1974      *       } catch (ExecutionException ee) {
1975      *           t = ee.getCause();
1976      *       } catch (InterruptedException ie) {
1977      *           Thread.currentThread().interrupt(); // ignore/reset
1978      *       }
1979      *     }
1980      *     if (t != null)
1981      *       System.out.println(t);
1982      *   }
1983      * }}</pre>
1984      *
1985      * @param r the runnable that has completed
1986      * @param t the exception that caused termination, or null if
1987      * execution completed normally
1988      */
1989     protected void afterExecute(Runnable r, Throwable t) { }
1990 
1991     /**
1992      * Method invoked when the Executor has terminated.  Default
1993      * implementation does nothing. Note: To properly nest multiple
1994      * overridings, subclasses should generally invoke
1995      * {@code super.terminated} within this method.
1996      */
1997     protected void terminated() { }
1998 
1999     /* Predefined RejectedExecutionHandlers */
2000 
2001     /**
2002      * A handler for rejected tasks that runs the rejected task
2003      * directly in the calling thread of the {@code execute} method,
2004      * unless the executor has been shut down, in which case the task
2005      * is discarded.
2006      */
2007     public static class CallerRunsPolicy implements RejectedExecutionHandler {
2008         /**
2009          * Creates a {@code CallerRunsPolicy}.
2010          */
2011         public CallerRunsPolicy() { }
2012 
2013         /**
2014          * Executes task r in the caller's thread, unless the executor
2015          * has been shut down, in which case the task is discarded.
2016          *
2017          * @param r the runnable task requested to be executed
2018          * @param e the executor attempting to execute this task
2019          */
2020         public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
2021             if (!e.isShutdown()) {
2022                 r.run();
2023             }
2024         }
2025     }
2026 
2027     /**
2028      * A handler for rejected tasks that throws a
2029      * {@code RejectedExecutionException}.
2030      */
2031     public static class AbortPolicy implements RejectedExecutionHandler {
2032         /**
2033          * Creates an {@code AbortPolicy}.
2034          */
2035         public AbortPolicy() { }
2036 
2037         /**
2038          * Always throws RejectedExecutionException.
2039          *
2040          * @param r the runnable task requested to be executed
2041          * @param e the executor attempting to execute this task
2042          * @throws RejectedExecutionException always
2043          */
2044         public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
2045             throw new RejectedExecutionException("Task " + r.toString() +
2046                                                  " rejected from " +
2047                                                  e.toString());
2048         }
2049     }
2050 
2051     /**
2052      * A handler for rejected tasks that silently discards the
2053      * rejected task.
2054      */
2055     public static class DiscardPolicy implements RejectedExecutionHandler {
2056         /**
2057          * Creates a {@code DiscardPolicy}.
2058          */
2059         public DiscardPolicy() { }
2060 
2061         /**
2062          * Does nothing, which has the effect of discarding task r.
2063          *
2064          * @param r the runnable task requested to be executed
2065          * @param e the executor attempting to execute this task
2066          */
2067         public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
2068         }
2069     }
2070 
2071     /**
2072      * A handler for rejected tasks that discards the oldest unhandled
2073      * request and then retries {@code execute}, unless the executor
2074      * is shut down, in which case the task is discarded.
2075      */
2076     public static class DiscardOldestPolicy implements RejectedExecutionHandler {
2077         /**
2078          * Creates a {@code DiscardOldestPolicy} for the given executor.
2079          */
2080         public DiscardOldestPolicy() { }
2081 
2082         /**
2083          * Obtains and ignores the next task that the executor
2084          * would otherwise execute, if one is immediately available,
2085          * and then retries execution of task r, unless the executor
2086          * is shut down, in which case task r is instead discarded.
2087          *
2088          * @param r the runnable task requested to be executed
2089          * @param e the executor attempting to execute this task
2090          */
2091         public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
2092             if (!e.isShutdown()) {
2093                 e.getQueue().poll();
2094                 e.execute(r);
2095             }
2096         }
2097     }
2098 }
public class ThreadPoolExecutor extends AbstractExecutorService

  再看一下AbstractExecutorService:app

  1 /*
  2  * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
  3  *
  4  *
  5  *
  6  *
  7  *
  8  *
  9  *
 10  *
 11  *
 12  *
 13  *
 14  *
 15  *
 16  *
 17  *
 18  *
 19  *
 20  *
 21  *
 22  *
 23  */
 24 
 25 /*
 26  *
 27  *
 28  *
 29  *
 30  *
 31  * Written by Doug Lea with assistance from members of JCP JSR-166
 32  * Expert Group and released to the public domain, as explained at
 33  * http://creativecommons.org/publicdomain/zero/1.0/
 34  */
 35 
 36 package java.util.concurrent;
 37 import java.util.*;
 38 
 39 /**
 40  * Provides default implementations of {@link ExecutorService}
 41  * execution methods. This class implements the {@code submit},
 42  * {@code invokeAny} and {@code invokeAll} methods using a
 43  * {@link RunnableFuture} returned by {@code newTaskFor}, which defaults
 44  * to the {@link FutureTask} class provided in this package.  For example,
 45  * the implementation of {@code submit(Runnable)} creates an
 46  * associated {@code RunnableFuture} that is executed and
 47  * returned. Subclasses may override the {@code newTaskFor} methods
 48  * to return {@code RunnableFuture} implementations other than
 49  * {@code FutureTask}.
 50  *
 51  * <p><b>Extension example</b>. Here is a sketch of a class
 52  * that customizes {@link ThreadPoolExecutor} to use
 53  * a {@code CustomTask} class instead of the default {@code FutureTask}:
 54  *  <pre> {@code
 55  * public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
 56  *
 57  *   static class CustomTask<V> implements RunnableFuture<V> {...}
 58  *
 59  *   protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
 60  *       return new CustomTask<V>(c);
 61  *   }
 62  *   protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) {
 63  *       return new CustomTask<V>(r, v);
 64  *   }
 65  *   // ... add constructors, etc.
 66  * }}</pre>
 67  *
 68  * @since 1.5
 69  * @author Doug Lea
 70  */
 71 public abstract class AbstractExecutorService implements ExecutorService {
 72 
 73     /**
 74      * Returns a {@code RunnableFuture} for the given runnable and default
 75      * value.
 76      *
 77      * @param runnable the runnable task being wrapped
 78      * @param value the default value for the returned future
 79      * @param <T> the type of the given value
 80      * @return a {@code RunnableFuture} which, when run, will run the
 81      * underlying runnable and which, as a {@code Future}, will yield
 82      * the given value as its result and provide for cancellation of
 83      * the underlying task
 84      * @since 1.6
 85      */
 86     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
 87         return new FutureTask<T>(runnable, value);
 88     }
 89 
 90     /**
 91      * Returns a {@code RunnableFuture} for the given callable task.
 92      *
 93      * @param callable the callable task being wrapped
 94      * @param <T> the type of the callable's result
 95      * @return a {@code RunnableFuture} which, when run, will call the
 96      * underlying callable and which, as a {@code Future}, will yield
 97      * the callable's result as its result and provide for
 98      * cancellation of the underlying task
 99      * @since 1.6
100      */
101     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
102         return new FutureTask<T>(callable);
103     }
104 
105     /**
106      * @throws RejectedExecutionException {@inheritDoc}
107      * @throws NullPointerException       {@inheritDoc}
108      */
109     public Future<?> submit(Runnable task) {
110         if (task == null) throw new NullPointerException();
111         RunnableFuture<Void> ftask = newTaskFor(task, null);
112         execute(ftask);
113         return ftask;
114     }
115 
116     /**
117      * @throws RejectedExecutionException {@inheritDoc}
118      * @throws NullPointerException       {@inheritDoc}
119      */
120     public <T> Future<T> submit(Runnable task, T result) {
121         if (task == null) throw new NullPointerException();
122         RunnableFuture<T> ftask = newTaskFor(task, result);
123         execute(ftask);
124         return ftask;
125     }
126 
127     /**
128      * @throws RejectedExecutionException {@inheritDoc}
129      * @throws NullPointerException       {@inheritDoc}
130      */
131     public <T> Future<T> submit(Callable<T> task) {
132         if (task == null) throw new NullPointerException();
133         RunnableFuture<T> ftask = newTaskFor(task);
134         execute(ftask);
135         return ftask;
136     }
137 
138     /**
139      * the main mechanics of invokeAny.
140      */
141     private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
142                               boolean timed, long nanos)
143         throws InterruptedException, ExecutionException, TimeoutException {
144         if (tasks == null)
145             throw new NullPointerException();
146         int ntasks = tasks.size();
147         if (ntasks == 0)
148             throw new IllegalArgumentException();
149         ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
150         ExecutorCompletionService<T> ecs =
151             new ExecutorCompletionService<T>(this);
152 
153         // For efficiency, especially in executors with limited
154         // parallelism, check to see if previously submitted tasks are
155         // done before submitting more of them. This interleaving
156         // plus the exception mechanics account for messiness of main
157         // loop.
158 
159         try {
160             // Record exceptions so that if we fail to obtain any
161             // result, we can throw the last exception we got.
162             ExecutionException ee = null;
163             final long deadline = timed ? System.nanoTime() + nanos : 0L;
164             Iterator<? extends Callable<T>> it = tasks.iterator();
165 
166             // Start one task for sure; the rest incrementally
167             futures.add(ecs.submit(it.next()));
168             --ntasks;
169             int active = 1;
170 
171             for (;;) {
172                 Future<T> f = ecs.poll();
173                 if (f == null) {
174                     if (ntasks > 0) {
175                         --ntasks;
176                         futures.add(ecs.submit(it.next()));
177                         ++active;
178                     }
179                     else if (active == 0)
180                         break;
181                     else if (timed) {
182                         f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
183                         if (f == null)
184                             throw new TimeoutException();
185                         nanos = deadline - System.nanoTime();
186                     }
187                     else
188                         f = ecs.take();
189                 }
190                 if (f != null) {
191                     --active;
192                     try {
193                         return f.get();
194                     } catch (ExecutionException eex) {
195                         ee = eex;
196                     } catch (RuntimeException rex) {
197                         ee = new ExecutionException(rex);
198                     }
199                 }
200             }
201 
202             if (ee == null)
203                 ee = new ExecutionException();
204             throw ee;
205 
206         } finally {
207             for (int i = 0, size = futures.size(); i < size; i++)
208                 futures.get(i).cancel(true);
209         }
210     }
211 
212     public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
213         throws InterruptedException, ExecutionException {
214         try {
215             return doInvokeAny(tasks, false, 0);
216         } catch (TimeoutException cannotHappen) {
217             assert false;
218             return null;
219         }
220     }
221 
222     public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
223                            long timeout, TimeUnit unit)
224         throws InterruptedException, ExecutionException, TimeoutException {
225         return doInvokeAny(tasks, true, unit.toNanos(timeout));
226     }
227 
228     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
229         throws InterruptedException {
230         if (tasks == null)
231             throw new NullPointerException();
232         ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
233         boolean done = false;
234         try {
235             for (Callable<T> t : tasks) {
236                 RunnableFuture<T> f = newTaskFor(t);
237                 futures.add(f);
238                 execute(f);
239             }
240             for (int i = 0, size = futures.size(); i < size; i++) {
241                 Future<T> f = futures.get(i);
242                 if (!f.isDone()) {
243                     try {
244                         f.get();
245                     } catch (CancellationException ignore) {
246                     } catch (ExecutionException ignore) {
247                     }
248                 }
249             }
250             done = true;
251             return futures;
252         } finally {
253             if (!done)
254                 for (int i = 0, size = futures.size(); i < size; i++)
255                     futures.get(i).cancel(true);
256         }
257     }
258 
259     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
260                                          long timeout, TimeUnit unit)
261         throws InterruptedException {
262         if (tasks == null)
263             throw new NullPointerException();
264         long nanos = unit.toNanos(timeout);
265         ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
266         boolean done = false;
267         try {
268             for (Callable<T> t : tasks)
269                 futures.add(newTaskFor(t));
270 
271             final long deadline = System.nanoTime() + nanos;
272             final int size = futures.size();
273 
274             // Interleave time checks and calls to execute in case
275             // executor doesn't have any/much parallelism.
276             for (int i = 0; i < size; i++) {
277                 execute((Runnable)futures.get(i));
278                 nanos = deadline - System.nanoTime();
279                 if (nanos <= 0L)
280                     return futures;
281             }
282 
283             for (int i = 0; i < size; i++) {
284                 Future<T> f = futures.get(i);
285                 if (!f.isDone()) {
286                     if (nanos <= 0L)
287                         return futures;
288                     try {
289                         f.get(nanos, TimeUnit.NANOSECONDS);
290                     } catch (CancellationException ignore) {
291                     } catch (ExecutionException ignore) {
292                     } catch (TimeoutException toe) {
293                         return futures;
294                     }
295                     nanos = deadline - System.nanoTime();
296                 }
297             }
298             done = true;
299             return futures;
300         } finally {
301             if (!done)
302                 for (int i = 0, size = futures.size(); i < size; i++)
303                     futures.get(i).cancel(true);
304         }
305     }
306 
307 }
public abstract class AbstractExecutorService implements ExecutorService

  再看一下ExecutorService:less

  1 /*
  2  * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
  3  *
  4  *
  5  *
  6  *
  7  *
  8  *
  9  *
 10  *
 11  *
 12  *
 13  *
 14  *
 15  *
 16  *
 17  *
 18  *
 19  *
 20  *
 21  *
 22  *
 23  */
 24 
 25 /*
 26  *
 27  *
 28  *
 29  *
 30  *
 31  * Written by Doug Lea with assistance from members of JCP JSR-166
 32  * Expert Group and released to the public domain, as explained at
 33  * http://creativecommons.org/publicdomain/zero/1.0/
 34  */
 35 
 36 package java.util.concurrent;
 37 import java.util.List;
 38 import java.util.Collection;
 39 
 40 /**
 41  * An {@link Executor} that provides methods to manage termination and
 42  * methods that can produce a {@link Future} for tracking progress of
 43  * one or more asynchronous tasks.
 44  *
 45  * <p>An {@code ExecutorService} can be shut down, which will cause
 46  * it to reject new tasks.  Two different methods are provided for
 47  * shutting down an {@code ExecutorService}. The {@link #shutdown}
 48  * method will allow previously submitted tasks to execute before
 49  * terminating, while the {@link #shutdownNow} method prevents waiting
 50  * tasks from starting and attempts to stop currently executing tasks.
 51  * Upon termination, an executor has no tasks actively executing, no
 52  * tasks awaiting execution, and no new tasks can be submitted.  An
 53  * unused {@code ExecutorService} should be shut down to allow
 54  * reclamation of its resources.
 55  *
 56  * <p>Method {@code submit} extends base method {@link
 57  * Executor#execute(Runnable)} by creating and returning a {@link Future}
 58  * that can be used to cancel execution and/or wait for completion.
 59  * Methods {@code invokeAny} and {@code invokeAll} perform the most
 60  * commonly useful forms of bulk execution, executing a collection of
 61  * tasks and then waiting for at least one, or all, to
 62  * complete. (Class {@link ExecutorCompletionService} can be used to
 63  * write customized variants of these methods.)
 64  *
 65  * <p>The {@link Executors} class provides factory methods for the
 66  * executor services provided in this package.
 67  *
 68  * <h3>Usage Examples</h3>
 69  *
 70  * Here is a sketch of a network service in which threads in a thread
 71  * pool service incoming requests. It uses the preconfigured {@link
 72  * Executors#newFixedThreadPool} factory method:
 73  *
 74  *  <pre> {@code
 75  * class NetworkService implements Runnable {
 76  *   private final ServerSocket serverSocket;
 77  *   private final ExecutorService pool;
 78  *
 79  *   public NetworkService(int port, int poolSize)
 80  *       throws IOException {
 81  *     serverSocket = new ServerSocket(port);
 82  *     pool = Executors.newFixedThreadPool(poolSize);
 83  *   }
 84  *
 85  *   public void run() { // run the service
 86  *     try {
 87  *       for (;;) {
 88  *         pool.execute(new Handler(serverSocket.accept()));
 89  *       }
 90  *     } catch (IOException ex) {
 91  *       pool.shutdown();
 92  *     }
 93  *   }
 94  * }
 95  *
 96  * class Handler implements Runnable {
 97  *   private final Socket socket;
 98  *   Handler(Socket socket) { this.socket = socket; }
 99  *   public void run() {
100  *     // read and service request on socket
101  *   }
102  * }}</pre>
103  *
104  * The following method shuts down an {@code ExecutorService} in two phases,
105  * first by calling {@code shutdown} to reject incoming tasks, and then
106  * calling {@code shutdownNow}, if necessary, to cancel any lingering tasks:
107  *
108  *  <pre> {@code
109  * void shutdownAndAwaitTermination(ExecutorService pool) {
110  *   pool.shutdown(); // Disable new tasks from being submitted
111  *   try {
112  *     // Wait a while for existing tasks to terminate
113  *     if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
114  *       pool.shutdownNow(); // Cancel currently executing tasks
115  *       // Wait a while for tasks to respond to being cancelled
116  *       if (!pool.awaitTermination(60, TimeUnit.SECONDS))
117  *           System.err.println("Pool did not terminate");
118  *     }
119  *   } catch (InterruptedException ie) {
120  *     // (Re-)Cancel if current thread also interrupted
121  *     pool.shutdownNow();
122  *     // Preserve interrupt status
123  *     Thread.currentThread().interrupt();
124  *   }
125  * }}</pre>
126  *
127  * <p>Memory consistency effects: Actions in a thread prior to the
128  * submission of a {@code Runnable} or {@code Callable} task to an
129  * {@code ExecutorService}
130  * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
131  * any actions taken by that task, which in turn <i>happen-before</i> the
132  * result is retrieved via {@code Future.get()}.
133  *
134  * @since 1.5
135  * @author Doug Lea
136  */
137 public interface ExecutorService extends Executor {
138 
139     /**
140      * Initiates an orderly shutdown in which previously submitted
141      * tasks are executed, but no new tasks will be accepted.
142      * Invocation has no additional effect if already shut down.
143      *
144      * <p>This method does not wait for previously submitted tasks to
145      * complete execution.  Use {@link #awaitTermination awaitTermination}
146      * to do that.
147      *
148      * @throws SecurityException if a security manager exists and
149      *         shutting down this ExecutorService may manipulate
150      *         threads that the caller is not permitted to modify
151      *         because it does not hold {@link
152      *         java.lang.RuntimePermission}{@code ("modifyThread")},
153      *         or the security manager's {@code checkAccess} method
154      *         denies access.
155      */
156     void shutdown();
157 
158     /**
159      * Attempts to stop all actively executing tasks, halts the
160      * processing of waiting tasks, and returns a list of the tasks
161      * that were awaiting execution.
162      *
163      * <p>This method does not wait for actively executing tasks to
164      * terminate.  Use {@link #awaitTermination awaitTermination} to
165      * do that.
166      *
167      * <p>There are no guarantees beyond best-effort attempts to stop
168      * processing actively executing tasks.  For example, typical
169      * implementations will cancel via {@link Thread#interrupt}, so any
170      * task that fails to respond to interrupts may never terminate.
171      *
172      * @return list of tasks that never commenced execution
173      * @throws SecurityException if a security manager exists and
174      *         shutting down this ExecutorService may manipulate
175      *         threads that the caller is not permitted to modify
176      *         because it does not hold {@link
177      *         java.lang.RuntimePermission}{@code ("modifyThread")},
178      *         or the security manager's {@code checkAccess} method
179      *         denies access.
180      */
181     List<Runnable> shutdownNow();
182 
183     /**
184      * Returns {@code true} if this executor has been shut down.
185      *
186      * @return {@code true} if this executor has been shut down
187      */
188     boolean isShutdown();
189 
190     /**
191      * Returns {@code true} if all tasks have completed following shut down.
192      * Note that {@code isTerminated} is never {@code true} unless
193      * either {@code shutdown} or {@code shutdownNow} was called first.
194      *
195      * @return {@code true} if all tasks have completed following shut down
196      */
197     boolean isTerminated();
198 
199     /**
200      * Blocks until all tasks have completed execution after a shutdown
201      * request, or the timeout occurs, or the current thread is
202      * interrupted, whichever happens first.
203      *
204      * @param timeout the maximum time to wait
205      * @param unit the time unit of the timeout argument
206      * @return {@code true} if this executor terminated and
207      *         {@code false} if the timeout elapsed before termination
208      * @throws InterruptedException if interrupted while waiting
209      */
210     boolean awaitTermination(long timeout, TimeUnit unit)
211         throws InterruptedException;
212 
213     /**
214      * Submits a value-returning task for execution and returns a
215      * Future representing the pending results of the task. The
216      * Future's {@code get} method will return the task's result upon
217      * successful completion.
218      *
219      * <p>
220      * If you would like to immediately block waiting
221      * for a task, you can use constructions of the form
222      * {@code result = exec.submit(aCallable).get();}
223      *
224      * <p>Note: The {@link Executors} class includes a set of methods
225      * that can convert some other common closure-like objects,
226      * for example, {@link java.security.PrivilegedAction} to
227      * {@link Callable} form so they can be submitted.
228      *
229      * @param task the task to submit
230      * @param <T> the type of the task's result
231      * @return a Future representing pending completion of the task
232      * @throws RejectedExecutionException if the task cannot be
233      *         scheduled for execution
234      * @throws NullPointerException if the task is null
235      */
236     <T> Future<T> submit(Callable<T> task);
237 
238     /**
239      * Submits a Runnable task for execution and returns a Future
240      * representing that task. The Future's {@code get} method will
241      * return the given result upon successful completion.
242      *
243      * @param task the task to submit
244      * @param result the result to return
245      * @param <T> the type of the result
246      * @return a Future representing pending completion of the task
247      * @throws RejectedExecutionException if the task cannot be
248      *         scheduled for execution
249      * @throws NullPointerException if the task is null
250      */
251     <T> Future<T> submit(Runnable task, T result);
252 
253     /**
254      * Submits a Runnable task for execution and returns a Future
255      * representing that task. The Future's {@code get} method will
256      * return {@code null} upon <em>successful</em> completion.
257      *
258      * @param task the task to submit
259      * @return a Future representing pending completion of the task
260      * @throws RejectedExecutionException if the task cannot be
261      *         scheduled for execution
262      * @throws NullPointerException if the task is null
263      */
264     Future<?> submit(Runnable task);
265 
266     /**
267      * Executes the given tasks, returning a list of Futures holding
268      * their status and results when all complete.
269      * {@link Future#isDone} is {@code true} for each
270      * element of the returned list.
271      * Note that a <em>completed</em> task could have
272      * terminated either normally or by throwing an exception.
273      * The results of this method are undefined if the given
274      * collection is modified while this operation is in progress.
275      *
276      * @param tasks the collection of tasks
277      * @param <T> the type of the values returned from the tasks
278      * @return a list of Futures representing the tasks, in the same
279      *         sequential order as produced by the iterator for the
280      *         given task list, each of which has completed
281      * @throws InterruptedException if interrupted while waiting, in
282      *         which case unfinished tasks are cancelled
283      * @throws NullPointerException if tasks or any of its elements are {@code null}
284      * @throws RejectedExecutionException if any task cannot be
285      *         scheduled for execution
286      */
287     <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
288         throws InterruptedException;
289 
290     /**
291      * Executes the given tasks, returning a list of Futures holding
292      * their status and results
293      * when all complete or the timeout expires, whichever happens first.
294      * {@link Future#isDone} is {@code true} for each
295      * element of the returned list.
296      * Upon return, tasks that have not completed are cancelled.
297      * Note that a <em>completed</em> task could have
298      * terminated either normally or by throwing an exception.
299      * The results of this method are undefined if the given
300      * collection is modified while this operation is in progress.
301      *
302      * @param tasks the collection of tasks
303      * @param timeout the maximum time to wait
304      * @param unit the time unit of the timeout argument
305      * @param <T> the type of the values returned from the tasks
306      * @return a list of Futures representing the tasks, in the same
307      *         sequential order as produced by the iterator for the
308      *         given task list. If the operation did not time out,
309      *         each task will have completed. If it did time out, some
310      *         of these tasks will not have completed.
311      * @throws InterruptedException if interrupted while waiting, in
312      *         which case unfinished tasks are cancelled
313      * @throws NullPointerException if tasks, any of its elements, or
314      *         unit are {@code null}
315      * @throws RejectedExecutionException if any task cannot be scheduled
316      *         for execution
317      */
318     <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
319                                   long timeout, TimeUnit unit)
320         throws InterruptedException;
321 
322     /**
323      * Executes the given tasks, returning the result
324      * of one that has completed successfully (i.e., without throwing
325      * an exception), if any do. Upon normal or exceptional return,
326      * tasks that have not completed are cancelled.
327      * The results of this method are undefined if the given
328      * collection is modified while this operation is in progress.
329      *
330      * @param tasks the collection of tasks
331      * @param <T> the type of the values returned from the tasks
332      * @return the result returned by one of the tasks
333      * @throws InterruptedException if interrupted while waiting
334      * @throws NullPointerException if tasks or any element task
335      *         subject to execution is {@code null}
336      * @throws IllegalArgumentException if tasks is empty
337      * @throws ExecutionException if no task successfully completes
338      * @throws RejectedExecutionException if tasks cannot be scheduled
339      *         for execution
340      */
341     <T> T invokeAny(Collection<? extends Callable<T>> tasks)
342         throws InterruptedException, ExecutionException;
343 
344     /**
345      * Executes the given tasks, returning the result
346      * of one that has completed successfully (i.e., without throwing
347      * an exception), if any do before the given timeout elapses.
348      * Upon normal or exceptional return, tasks that have not
349      * completed are cancelled.
350      * The results of this method are undefined if the given
351      * collection is modified while this operation is in progress.
352      *
353      * @param tasks the collection of tasks
354      * @param timeout the maximum time to wait
355      * @param unit the time unit of the timeout argument
356      * @param <T> the type of the values returned from the tasks
357      * @return the result returned by one of the tasks
358      * @throws InterruptedException if interrupted while waiting
359      * @throws NullPointerException if tasks, or unit, or any element
360      *         task subject to execution is {@code null}
361      * @throws TimeoutException if the given timeout elapses before
362      *         any task successfully completes
363      * @throws ExecutionException if no task successfully completes
364      * @throws RejectedExecutionException if tasks cannot be scheduled
365      *         for execution
366      */
367     <T> T invokeAny(Collection<? extends Callable<T>> tasks,
368                     long timeout, TimeUnit unit)
369         throws InterruptedException, ExecutionException, TimeoutException;
370 }
public interface ExecutorService extends Executor

  再看一下 Executor:

  1 /*
  2  * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
  3  *
  4  *
  5  *
  6  *
  7  *
  8  *
  9  *
 10  *
 11  *
 12  *
 13  *
 14  *
 15  *
 16  *
 17  *
 18  *
 19  *
 20  *
 21  *
 22  *
 23  */
 24 
 25 /*
 26  *
 27  *
 28  *
 29  *
 30  *
 31  * Written by Doug Lea with assistance from members of JCP JSR-166
 32  * Expert Group and released to the public domain, as explained at
 33  * http://creativecommons.org/publicdomain/zero/1.0/
 34  */
 35 
 36 package java.util.concurrent;
 37 
 38 /**
 39  * An object that executes submitted {@link Runnable} tasks. This
 40  * interface provides a way of decoupling task submission from the
 41  * mechanics of how each task will be run, including details of thread
 42  * use, scheduling, etc.  An {@code Executor} is normally used
 43  * instead of explicitly creating threads. For example, rather than
 44  * invoking {@code new Thread(new(RunnableTask())).start()} for each
 45  * of a set of tasks, you might use:
 46  *
 47  * <pre>
 48  * Executor executor = <em>anExecutor</em>;
 49  * executor.execute(new RunnableTask1());
 50  * executor.execute(new RunnableTask2());
 51  * ...
 52  * </pre>
 53  *
 54  * However, the {@code Executor} interface does not strictly
 55  * require that execution be asynchronous. In the simplest case, an
 56  * executor can run the submitted task immediately in the caller's
 57  * thread:
 58  *
 59  *  <pre> {@code
 60  * class DirectExecutor implements Executor {
 61  *   public void execute(Runnable r) {
 62  *     r.run();
 63  *   }
 64  * }}</pre>
 65  *
 66  * More typically, tasks are executed in some thread other
 67  * than the caller's thread.  The executor below spawns a new thread
 68  * for each task.
 69  *
 70  *  <pre> {@code
 71  * class ThreadPerTaskExecutor implements Executor {
 72  *   public void execute(Runnable r) {
 73  *     new Thread(r).start();
 74  *   }
 75  * }}</pre>
 76  *
 77  * Many {@code Executor} implementations impose some sort of
 78  * limitation on how and when tasks are scheduled.  The executor below
 79  * serializes the submission of tasks to a second executor,
 80  * illustrating a composite executor.
 81  *
 82  *  <pre> {@code
 83  * class SerialExecutor implements Executor {
 84  *   final Queue<Runnable> tasks = new ArrayDeque<Runnable>();
 85  *   final Executor executor;
 86  *   Runnable active;
 87  *
 88  *   SerialExecutor(Executor executor) {
 89  *     this.executor = executor;
 90  *   }
 91  *
 92  *   public synchronized void execute(final Runnable r) {
 93  *     tasks.offer(new Runnable() {
 94  *       public void run() {
 95  *         try {
 96  *           r.run();
 97  *         } finally {
 98  *           scheduleNext();
 99  *         }
100  *       }
101  *     });
102  *     if (active == null) {
103  *       scheduleNext();
104  *     }
105  *   }
106  *
107  *   protected synchronized void scheduleNext() {
108  *     if ((active = tasks.poll()) != null) {
109  *       executor.execute(active);
110  *     }
111  *   }
112  * }}</pre>
113  *
114  * The {@code Executor} implementations provided in this package
115  * implement {@link ExecutorService}, which is a more extensive
116  * interface.  The {@link ThreadPoolExecutor} class provides an
117  * extensible thread pool implementation. The {@link Executors} class
118  * provides convenient factory methods for these Executors.
119  *
120  * <p>Memory consistency effects: Actions in a thread prior to
121  * submitting a {@code Runnable} object to an {@code Executor}
122  * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
123  * its execution begins, perhaps in another thread.
124  *
125  * @since 1.5
126  * @author Doug Lea
127  */
128 public interface Executor {
129 
130     /**
131      * Executes the given command at some time in the future.  The command
132      * may execute in a new thread, in a pooled thread, or in the calling
133      * thread, at the discretion of the {@code Executor} implementation.
134      *
135      * @param command the runnable task
136      * @throws RejectedExecutionException if this task cannot be
137      * accepted for execution
138      * @throws NullPointerException if command is null
139      */
140     void execute(Runnable command);
141 }
public interface Executor

    2.2.一、解讀ThreadPoolExecutor源碼

     首先咱們看一下構造函數:

 1      /**
 2      * @param corePoolSize the number of threads to keep in the pool, even
 3      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
 4      * @param maximumPoolSize the maximum number of threads to allow in the
 5      *        pool
 6      * @param keepAliveTime when the number of threads is greater than
 7      *        the core, this is the maximum time that excess idle threads
 8      *        will wait for new tasks before terminating.
 9      * @param unit the time unit for the {@code keepAliveTime} argument
10      * @param workQueue the queue to use for holding tasks before they are
11      *        executed.  This queue will hold only the {@code Runnable}
12      *        tasks submitted by the {@code execute} method.
13      * @param threadFactory the factory to use when the executor
14      *        creates a new thread
15      * @param handler the handler to use when execution is blocked
16      *        because the thread bounds and queue capacities are reached
17      */
18     public ThreadPoolExecutor(int corePoolSize,
19                               int maximumPoolSize,
20                               long keepAliveTime,
21                               TimeUnit unit,
22                               BlockingQueue<Runnable> workQueue,
23                               ThreadFactory threadFactory,
24                               RejectedExecutionHandler handler) {
25         if (corePoolSize < 0 ||
26             maximumPoolSize <= 0 ||
27             maximumPoolSize < corePoolSize ||
28             keepAliveTime < 0)
29             throw new IllegalArgumentException();
30         if (workQueue == null || threadFactory == null || handler == null)
31             throw new NullPointerException();
32         this.corePoolSize = corePoolSize;
33         this.maximumPoolSize = maximumPoolSize;
34         this.workQueue = workQueue;
35         this.keepAliveTime = unit.toNanos(keepAliveTime);
36         this.threadFactory = threadFactory;
37         this.handler = handler;
38     }
39     public ThreadPoolExecutor(int corePoolSize,
40                               int maximumPoolSize,
41                               long keepAliveTime,
42                               TimeUnit unit,
43                               BlockingQueue<Runnable> workQueue) {
44         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
45              Executors.defaultThreadFactory(), defaultHandler);
46     }
47 
48 
49     public ThreadPoolExecutor(int corePoolSize,
50                               int maximumPoolSize,
51                               long keepAliveTime,
52                               TimeUnit unit,
53                               BlockingQueue<Runnable> workQueue,
54                               ThreadFactory threadFactory) {
55         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
56              threadFactory, defaultHandler);
57     }
58 
59     public ThreadPoolExecutor(int corePoolSize,
60                               int maximumPoolSize,
61                               long keepAliveTime,
62                               TimeUnit unit,
63                               BlockingQueue<Runnable> workQueue,
64                               RejectedExecutionHandler handler) {
65         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
66              Executors.defaultThreadFactory(), handler);
67     }

    一共四個構造函數,其實本質上是調用一個最全的構造函數,其餘的有默認值而已。參數的含義:

    corePoolSize:核心池的大小,在建立了線程池後,默認狀況下,線程池中並無任何線程,而是等待有任務到來才建立線程去執行任務,除非調用了prestartAllCoreThreads()或者prestartCoreThread()方法預建立線程,即在沒有任務到來以前就建立corePoolSize個線程或者一個線程。默認狀況下,在建立了線程池後,線程池中的線程數爲0,當有任務來以後,就會建立一個線程去執行任務,當線程池中的線程數目達到corePoolSize後,就會把到達的任務放到緩存隊列當中;
    maximumPoolSize:線程池最大線程數,它表示在線程池中最多能建立多少個線程,不包括緩存的線程數量
    keepAliveTime:表示線程沒有任務執行時最多保持多久時間會終止。默認狀況下,只有當線程池中的線程數大於corePoolSize時,keepAliveTime纔會起做用。在線程池中的線程數大於corePoolSize時,若是一個線程空閒的時間達到keepAliveTime,則會終止;若是線程池中的線程數不超過corePoolSize時調用了allowCoreThreadTimeOut(boolean)方法,keepAliveTime參數也會起做用,直到線程池中的線程數爲0;
    unit:參數keepAliveTime的時間單位,有7種取值,在TimeUnit類中有7種靜態屬性:

1 TimeUnit.DAYS;               //
2 TimeUnit.HOURS;             //小時
3 TimeUnit.MINUTES;           //分鐘
4 TimeUnit.SECONDS;           //
5 TimeUnit.MILLISECONDS;      //毫秒
6 TimeUnit.MICROSECONDS;      //微妙
7 TimeUnit.NANOSECONDS;       //納秒

    workQueue:一個阻塞隊列,用來存儲等待執行的任務,這個參數的選擇也很重要,會對線程池的運行過程產生重大影響,通常來講,這裏的阻塞隊列有如下幾種選擇:

1 ArrayBlockingQueue
2 LinkedBlockingQueue
3 SynchronousQueue
4 PriorityBlockingQueue

  ArrayBlockingQueue和PriorityBlockingQueue使用較少,通常使用LinkedBlockingQueue和SynchronousQueue,線程池的排隊策略與BlockingQueue有關。
    threadFactory:線程工廠,主要用來建立線程;
    handler:表示當拒絕處理任務時的策略,有如下四種取值:

1 ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。
2 ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,可是不拋出異常。
3 ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,而後從新嘗試執行任務(重複此過程)
4 ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務

   2.2.二、線程池的狀態

 1      * The runState provides the main lifecycle control, taking on values:
 2      *
 3      *   RUNNING:  Accept new tasks and process queued tasks  4      *   SHUTDOWN: Don't accept new tasks, but process queued tasks
 5      *   STOP:     Don't accept new tasks, don't process queued tasks,  6      *             and interrupt in-progress tasks  7      * TIDYING: All tasks have terminated, workerCount is zero,  8      * the thread transitioning to state TIDYING  9      * will run the terminated() hook method 10      * TERMINATED: terminated() has completed 11      *
12      * The numerical order among these values matters, to allow
13      * ordered comparisons. The runState monotonically increases over
14      * time, but need not hit each state. The transitions are:
15      *
16      * RUNNING -> SHUTDOWN
17      *    On invocation of shutdown(), perhaps implicitly in finalize()
18      * (RUNNING or SHUTDOWN) -> STOP
19      *    On invocation of shutdownNow()
20      * SHUTDOWN -> TIDYING
21      *    When both queue and pool are empty
22      * STOP -> TIDYING
23      *    When pool is empty
24      * TIDYING -> TERMINATED
25      *    When the terminated() hook method has completed
26      *
27      * Threads waiting in awaitTermination() will return when the
28      * state reaches TERMINATED.
29      *
30      * Detecting the transition from SHUTDOWN to TIDYING is less
31      * straightforward than you'd like because the queue may become
32      * empty after non-empty and vice versa during SHUTDOWN state, but
33      * we can only terminate if, after seeing that it is empty, we see
34      * that workerCount is 0 (which sometimes entails a recheck -- see
35      * below).
36      */
37     private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
38     private static final int COUNT_BITS = Integer.SIZE - 3;
39     private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
40 
41     // runState is stored in the high-order bits
42     private static final int RUNNING    = -1 << COUNT_BITS;
43     private static final int SHUTDOWN   =  0 << COUNT_BITS;
44     private static final int STOP       =  1 << COUNT_BITS;
45     private static final int TIDYING    =  2 << COUNT_BITS;
46     private static final int TERMINATED =  3 << COUNT_BITS;

    能夠看到線程池的五種狀態的基本定義以及概念,值得注意的是,將狀態存儲在高位。

   2.2.三、任務的執行

  在瞭解將任務提交給線程池到任務執行完畢整個過程以前,咱們先來看一下ThreadPoolExecutor類中其餘的一些比較重要成員變量:

 1 private final BlockingQueue<Runnable> workQueue;   //任務緩存隊列,用來存放等待執行的任務
 2 private final ReentrantLock mainLock = new ReentrantLock();  
//線程池的主要狀態鎖,對線程池狀態(好比線程池大小、runState等)的改變都要使用這個鎖 3 private final HashSet<Worker> workers = new HashSet<Worker>(); //用來存放工做集 4 private volatile long keepAliveTime; //線程存活時間 5 private volatile boolean allowCoreThreadTimeOut; //是否容許爲核心線程設置存活時間 6 private volatile int corePoolSize;
//核心池的大小(即線程池中的線程數目大於這個參數時,提交的任務會被放進任務緩存隊列) 7 private volatile int maximumPoolSize; //線程池最大能容忍的線程數 8 private volatile int poolSize; //線程池中當前的線程數 9 private volatile RejectedExecutionHandler handler; //任務拒絕策略 10 private volatile ThreadFactory threadFactory; //線程工廠,用來建立線程 11 private int largestPoolSize; //用來記錄線程池中曾經出現過的最大線程數 12 private long completedTaskCount; //用來記錄已經執行完畢的任務個數

   這裏重點解釋一下corePoolSize、maximumPoolSize、largestPoolSize三個變量。
  corePoolSize在不少地方被翻譯成核心池大小,其實個人理解這個就是線程池的大小。舉個簡單的例子:

  假若有一個工廠,工廠裏面有10個工人,每一個工人同時只能作一件任務。所以只要當10個工人中有工人是空閒的,來了任務就分配給空閒的工人作;當10個工人都有任務在作時,若是還來了任務,就把任務進行排隊等待;若是說新任務數目增加的速度遠遠大於工人作任務的速度,那麼此時工廠主管可能會想補救措施,好比從新招4個臨時工人進來;而後就將任務也分配給這4個臨時工人作;若是說着14個工人作任務的速度仍是不夠,此時工廠主管可能就要考慮再也不接收新的任務或者拋棄前面的一些任務了。當這14個工人當中有人空閒時,而新任務增加的速度又比較緩慢,工廠主管可能就考慮辭掉4個臨時工了,只保持原來的10個工人,畢竟請額外的工人是要花錢的。

  這個例子中的corePoolSize就是10,而maximumPoolSize就是14(10+4)。
  也就是說corePoolSize就是線程池大小,maximumPoolSize是線程池的一種補救措施,即任務量忽然過大時的一種補救措施。
  largestPoolSize只是一個用來起記錄做用的變量,用來記錄線程池中曾經有過的最大線程數目,跟線程池的容量沒有任何關係。

   下面咱們看一下任務從提交到最終執行完畢經歷了哪些過程。
  在ThreadPoolExecutor類中,最核心的任務提交方法是execute()方法,雖然經過submit也能夠提交任務,可是實際上submit方法裏面最終調用的仍是execute()方法,因此咱們只須要研究execute()方法的實現原理便可:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
        if (runState == RUNNING && workQueue.offer(command)) {
            if (runState != RUNNING || poolSize == 0)
                ensureQueuedTaskHandled(command);
        }
        else if (!addIfUnderMaximumPoolSize(command))
            reject(command); // is shutdown or saturated
    }
}
1   首先,判斷提交的任務command是否爲null,如果null,則拋出空指針異常;  
2    接着,if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command))因爲是或條件運算符,因此先計算前半部分的值,若是線程池中當前線程數不小於核心池大小,那麼就會直接進入下面的if語句塊了。若是線程池中當前線程數小於核心池大小,則接着執行後半部分,也就是執行addIfUnderCorePoolSize(command)若是執行完addIfUnderCorePoolSize這個方法返回false,則繼續執行下面的if語句塊,不然整個方法就直接執行完畢了。
3   若是執行完addIfUnderCorePoolSize這個方法返回false,而後接着判斷if (runState == RUNNING && workQueue.offer(command))若是當前線程池處於RUNNING狀態,則將任務放入任務緩存隊列;若是當前線程池不處於RUNNING狀態或者任務放入緩存隊列失敗,則執行addIfUnderMaximumPoolSize(command);若是執行addIfUnderMaximumPoolSize方法失敗,則執行reject()方法進行任務拒絕處理。
4   回到前面:
5     if (runState == RUNNING && workQueue.offer(command))這句的執行,若是說當前線程池處於RUNNING狀態且將任務放入任務緩存隊列成功,則繼續進行判斷:
6     if (runState != RUNNING || poolSize == 0)這句判斷是爲了防止在將此任務添加進任務緩存隊列的同時其餘線程忽然調用shutdown或者shutdownNow方法關閉了線程池的一種應急措施。若是是這樣就執行ensureQueuedTaskHandled(command)進行應急處理,從名字能夠看出是保證添加到任務緩存隊列中的任務獲得處理。

  咱們看2個關鍵方法的實現:addIfUnderCorePoolSize和addIfUnderMaximumPoolSize:

 1 private boolean addIfUnderCorePoolSize(Runnable firstTask) {
 2     Thread t = null;
 3     final ReentrantLock mainLock = this.mainLock;
 4     mainLock.lock();
 5     try {
 6         if (poolSize < corePoolSize && runState == RUNNING)
 7             t = addThread(firstTask);        //建立線程去執行firstTask任務   
 8         } finally {
 9         mainLock.unlock();
10     }
11     if (t == null)
12         return false;
13     t.start();
14     return true;
15 }

   這個是addIfUnderCorePoolSize方法的具體實現,從名字能夠看出它的意圖就是當低於核心池大小時執行的方法。下面看其具體實現,首先獲取到鎖,由於這地方涉及到線程池狀態的變化,先經過if語句判斷當前線程池中的線程數目是否小於核心池大小,有人也許會有疑問,前面在execute()方法中不是已經判斷過了嗎,只有線程池當前線程數目小於核心池大小纔會執行addIfUnderCorePoolSize方法的,爲什麼這地方還要繼續判斷?緣由很簡單,前面的判斷過程當中並無加鎖,所以可能在execute方法判斷的時候poolSize小於corePoolSize,而判斷完以後,在其餘線程中又向線程池提交了任務,就可能致使poolSize不小於corePoolSize了,因此須要在這個地方繼續判斷。而後接着判斷線程池的狀態是否爲RUNNING,緣由也很簡單,由於有可能在其餘線程中調用了shutdown或者shutdownNow方法。而後就是執行

t = addThread(firstTask);

    這個方法很是關鍵,傳進去的參數爲提交的任務,返回值爲Thread類型。而後接着在下面判斷t是否爲空,爲空則代表建立線程失敗(即poolSize>=corePoolSize或者runState不等於RUNNING),不然調用t.start()方法啓動線程。
  咱們來看一下addThread方法的實現:

 1 private Thread addThread(Runnable firstTask) {
 2     Worker w = new Worker(firstTask);
 3     Thread t = threadFactory.newThread(w);  //建立一個線程,執行任務   
 4     if (t != null) {
 5         w.thread = t;            //將建立的線程的引用賦值爲w的成員變量       
 6         workers.add(w);
 7         int nt = ++poolSize;     //當前線程數加1       
 8         if (nt > largestPoolSize)
 9             largestPoolSize = nt;
10     }
11     return t;
12 }

   在addThread方法中,首先用提交的任務建立了一個Worker對象,而後調用線程工廠threadFactory建立了一個新的線程t,而後將線程t的引用賦值給了Worker對象的成員變量thread,接着經過workers.add(w)將Worker對象添加到工做集當中。
  下面咱們看一下Worker類的實現:

 1 private final class Worker implements Runnable {
 2     private final ReentrantLock runLock = new ReentrantLock();
 3     private Runnable firstTask;
 4     volatile long completedTasks;
 5     Thread thread;
 6     Worker(Runnable firstTask) {
 7         this.firstTask = firstTask;
 8     }
 9     boolean isActive() {
10         return runLock.isLocked();
11     }
12     void interruptIfIdle() {
13         final ReentrantLock runLock = this.runLock;
14         if (runLock.tryLock()) {
15             try {
16         if (thread != Thread.currentThread())
17         thread.interrupt();
18             } finally {
19                 runLock.unlock();
20             }
21         }
22     }
23     void interruptNow() {
24         thread.interrupt();
25     }
26  
27     private void runTask(Runnable task) {
28         final ReentrantLock runLock = this.runLock;
29         runLock.lock();
30         try {
31             if (runState < STOP &&
32                 Thread.interrupted() &&
33                 runState >= STOP)
34             boolean ran = false;
35             beforeExecute(thread, task);   //beforeExecute方法是ThreadPoolExecutor類的一個方法,沒有具體實現,用戶能夠根據
36             //本身須要重載這個方法和後面的afterExecute方法來進行一些統計信息,好比某個任務的執行時間等           
37             try {
38                 task.run();
39                 ran = true;
40                 afterExecute(task, null);
41                 ++completedTasks;
42             } catch (RuntimeException ex) {
43                 if (!ran)
44                     afterExecute(task, ex);
45                 throw ex;
46             }
47         } finally {
48             runLock.unlock();
49         }
50     }
51  
52     public void run() {
53         try {
54             Runnable task = firstTask;
55             firstTask = null;
56             while (task != null || (task = getTask()) != null) {
57                 runTask(task);
58                 task = null;
59             }
60         } finally {
61             workerDone(this);   //當任務隊列中沒有任務時,進行清理工做       
62         }
63     }
64 }
Worker類的實現

  它實際上實現了Runnable接口,所以上面的Thread t = threadFactory.newThread(w);效果跟Thread t = new Thread(w);這句的效果基本同樣,至關於傳進去了一個Runnable任務,在線程t中執行這個Runnable。既然Worker實現了Runnable接口,那麼天然最核心的方法即是run()方法了:    

 1 public void run() {
 2     try {
 3         Runnable task = firstTask;
 4         firstTask = null;
 5         while (task != null || (task = getTask()) != null) {
 6             runTask(task);
 7             task = null;
 8         }
 9     } finally {
10         workerDone(this);
11     }
12 }

   從run方法的實現能夠看出,它首先執行的是經過構造器傳進來的任務firstTask,在調用runTask()執行完firstTask以後,在while循環裏面不斷經過getTask()去取新的任務來執行,那麼去哪裏取呢?天然是從任務緩存隊列裏面去取,getTask是ThreadPoolExecutor類中的方法,並非Worker類中的方法,下面是getTask方法的實現:

 1 Runnable getTask() {
 2     for (;;) {
 3         try {
 4             int state = runState;
 5             if (state > SHUTDOWN)
 6                 return null;
 7             Runnable r;
 8             if (state == SHUTDOWN)  // Help drain queue
 9                 r = workQueue.poll();
10             else if (poolSize > corePoolSize || allowCoreThreadTimeOut) //若是線程數大於核心池大小或者容許爲核心池線程設置空閒時間,
11                 //則經過poll取任務,若等待必定的時間取不到任務,則返回null
12                 r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
13             else
14                 r = workQueue.take();
15             if (r != null)
16                 return r;
17             if (workerCanExit()) {    //若是沒取到任務,即r爲null,則判斷當前的worker是否能夠退出
18                 if (runState >= SHUTDOWN) // Wake up others
19                     interruptIdleWorkers();   //中斷處於空閒狀態的worker
20                 return null;
21             }
22             // Else retry
23         } catch (InterruptedException ie) {
24             // On interruption, re-check runState
25         }
26     }
27 }

   在getTask中,先判斷當前線程池狀態,若是runState大於SHUTDOWN(即爲STOP或者TERMINATED),則直接返回null。若是runState爲SHUTDOWN或者RUNNING,則從任務緩存隊列取任務。
   若是當前線程池的線程數大於核心池大小corePoolSize或者容許爲核心池中的線程設置空閒存活時間,則調用poll(time,timeUnit)來取任務,這個方法會等待必定的時間,若是取不到任務就返回null。
   而後判斷取到的任務r是否爲null,爲null則經過調用workerCanExit()方法來判斷當前worker是否能夠退出,咱們看一下workerCanExit()的實現:

 1 private boolean workerCanExit() {
 2     final ReentrantLock mainLock = this.mainLock;
 3     mainLock.lock();
 4     boolean canExit;
 5     //若是runState大於等於STOP,或者任務緩存隊列爲空了
 6     //或者容許爲核心池線程設置空閒存活時間而且線程池中的線程數目大於1
 7     try {
 8         canExit = runState >= STOP ||
 9             workQueue.isEmpty() ||
10             (allowCoreThreadTimeOut &&
11              poolSize > Math.max(1, corePoolSize));
12     } finally {
13         mainLock.unlock();
14     }
15     return canExit;
16 }

   也就是說若是線程池處於STOP狀態、任務隊列已爲空或者容許爲核心池線程設置空閒存活時間而且線程數大於1時,容許worker退出。若是容許worker退出,則調用interruptIdleWorkers()中斷處於空閒狀態的worker:    

 1 void interruptIdleWorkers() {
 2     final ReentrantLock mainLock = this.mainLock;
 3     mainLock.lock();
 4     try {
 5         for (Worker w : workers)  //實際上調用的是worker的interruptIfIdle()方法
 6             w.interruptIfIdle();
 7     } finally {
 8         mainLock.unlock();
 9     }
10 }

   從實現能夠看出,它實際上調用的是worker的interruptIfIdle()方法,在worker的interruptIfIdle()方法中:

 1 void interruptIfIdle() {
 2     final ReentrantLock runLock = this.runLock;
 3     if (runLock.tryLock()) {    
 4      //注意這裏,是調用tryLock()來獲取鎖的,由於若是當前worker正在執行任務,鎖已經被獲取了,是沒法獲取到鎖的
 5      //若是成功獲取了鎖,說明當前worker處於空閒狀態
 6         try {
 7           if (thread != Thread.currentThread())  
 8                   thread.interrupt();
 9         } finally {
10             runLock.unlock();
11         }
12     }
13 }

   這裏有一個很是巧妙的設計方式,假如咱們來設計線程池,可能會有一個任務分派線程,當發現有線程空閒時,就從任務緩存隊列中取一個任務交給空閒線程執行。可是在這裏,並無採用這樣的方式,由於這樣會要額外地對任務分派線程進行管理,無形地會增長難度和複雜度,這裏直接讓執行完任務的線程去任務緩存隊列裏面取任務來執行
  咱們再看addIfUnderMaximumPoolSize方法的實現,這個方法的實現思想和addIfUnderCorePoolSize方法的實現思想很是類似,惟一的區別在於addIfUnderMaximumPoolSize方法是在線程池中的線程數達到了核心池大小而且往任務隊列中添加任務失敗的狀況下執行的:    

 1 private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
 2     Thread t = null;
 3     final ReentrantLock mainLock = this.mainLock;
 4     mainLock.lock();
 5     try {
 6         if (poolSize < maximumPoolSize && runState == RUNNING)
 7             t = addThread(firstTask);
 8     } finally {
 9         mainLock.unlock();
10     }
11     if (t == null)
12         return false;
13     t.start();
14     return true;
15 }

  其實它和addIfUnderCorePoolSize方法的實現基本如出一轍,只是if語句判斷條件中的poolSize < maximumPoolSize不一樣而已。
  到這裏,咱們對任務提交給線程池以後到被執行的整個過程有了一個基本的瞭解,下面總結一下:

1   1)首先,要清楚corePoolSize和maximumPoolSize的含義;
2   2)其次,要知道Worker是用來起到什麼做用的;
3   3)要知道任務提交給線程池以後的處理策略,這裏總結一下主要有4點:
4     若是當前線程池中的線程數目小於corePoolSize,則每來一個任務,就會建立一個線程去執行這個任務;
5     若是當前線程池中的線程數目>=corePoolSize,則每來一個任務,會嘗試將其添加到任務緩存隊列當中,若添加成功,則該任務會等待空閒線程將其取出去執行;若添加失敗(通常來講是任務緩存隊列已滿),則會嘗試建立新的線程去執行這個任務;
6     若是當前線程池中的線程數目達到maximumPoolSize,則會採起任務拒絕策略進行處理;
7     若是線程池中的線程數量大於 corePoolSize時,若是某線程空閒時間超過keepAliveTime,線程將被終止,直至線程池中的線程數目不大於corePoolSize;若是容許爲核心池中的線程設置存活時間,那麼核心池中的線程空閒時間超過keepAliveTime,線程也會被終止。

 2.2.四、線程池中的線程初始化

     默認狀況下,建立線程池以後,線程池中是沒有線程的,須要提交任務以後纔會建立線程。
   在實際中若是須要線程池建立以後當即建立線程,能夠經過如下兩個方法辦到:

1     prestartCoreThread():初始化一個核心線程;
2     prestartAllCoreThreads():初始化全部核心線程

   下面是這2個方法的實現:

 1 public boolean prestartCoreThread() {
 2     return addIfUnderCorePoolSize(null); //注意傳進去的參數是null
 3 }
 4  
 5 public int prestartAllCoreThreads() {
 6     int n = 0;
 7     while (addIfUnderCorePoolSize(null))//注意傳進去的參數是null
 8         ++n;
 9     return n;
10 }

    注意上面傳進去的參數是null,根據第2小節的分析可知若是傳進去的參數爲null,則最後執行線程會阻塞在getTask方法中的 r = workQueue.take();即等待任務隊列中有任務

2.2.五、任務緩存隊列及排隊策略

  在前面咱們屢次提到了任務緩存隊列,即workQueue,它用來存放等待執行的任務。
  workQueue的類型爲BlockingQueue<Runnable>,一般能夠取下面三種類型:

1  ArrayBlockingQueue:基於數組的先進先出隊列,此隊列建立時必須指定大小;
2  LinkedBlockingQueue:基於鏈表的先進先出隊列,若是建立時沒有指定此隊列大小,則默認爲Integer.MAX_VALUE;
3  synchronousQueue:這個隊列比較特殊,它不會保存提交的任務,而是將直接新建一個線程來執行新來的任務。

2.2.六、任務拒絕策略

  當線程池的任務緩存隊列已滿而且線程池中的線程數目達到maximumPoolSize,若是還有任務到來就會採起任務拒絕策略,一般有如下四種策略:

1 ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。
2 ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,可是不拋出異常。
3 ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,而後從新嘗試執行任務(重複此過程)
4 ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務

2.2.七、線程池的關閉

  ThreadPoolExecutor提供了兩個方法,用於線程池的關閉,分別是shutdown()和shutdownNow(),其中:

1     shutdown():不會當即終止線程池,而是要等全部任務緩存隊列中的任務都執行完後才終止,但不再會接受新的任務
2     shutdownNow():當即終止線程池,並嘗試打斷正在執行的任務,而且清空任務緩存隊列,返回還沒有執行的任務

2.2.八、線程池容量的動態調整

  ThreadPoolExecutor提供了動態調整線程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),
    setCorePoolSize:設置核心池大小
    setMaximumPoolSize:設置線程池最大能建立的線程數目大小
  當上述參數從小變大時,ThreadPoolExecutor進行線程賦值,還可能當即建立新的線程來執行任務

2.2.九、合理配置線程池大小

   通常須要根據任務的類型來配置線程池大小:
   若是是CPU密集型任務,就須要儘可能壓榨CPU,參考值能夠設爲 NCPU+1
   若是是IO密集型任務,參考值能夠設置爲2*NCPU
   固然,這只是一個參考值,具體的設置還須要根據實際狀況進行調整,好比能夠先將線程池大小設置爲參考值,再觀察任務運行狀況和系統負載、資源利用率來進行適當調整。

3、經常使用的線程池

 3.一、newFixedThreadPool

    固定大小的線程池,能夠指定線程池的大小,該線程池corePoolSize和maximumPoolSize相等,阻塞隊列使用的是LinkedBlockingQueue,大小爲整數最大值。該線程池中的線程數量始終不變,當有新任務提交時,線程池中有空閒線程則會當即執行,若是沒有,則會暫存到阻塞隊列。對於固定大小的線程池,不存在線程數量的變化。同時使用無界的LinkedBlockingQueue來存放執行的任務。當任務提交十分頻繁的時候LinkedBlockingQueue 迅速增大,存在着耗盡系統資源的問題。並且在線程池空閒時,即線程池中沒有可運行任務時,它也不會釋放工做線程,還會佔用必定的系統資源,須要shutdown。

1 public static ExecutorService newFixedThreadPool(int var0) {
2         return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
3 }
4 public static ExecutorService newFixedThreadPool(int var0, ThreadFactory var1) {
5     return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), var1);
6 }
 1 package com.threadpool.test;
 2 
 3 import java.util.concurrent.ExecutorService;
 4 import java.util.concurrent.Executors;
 5 
 6 public class NewFixedThreadPoolTest {
 7 
 8     private static Runnable getThread(final int i) {
 9         return new Runnable() {
10             public void run() {
11                 try {
12                     Thread.sleep(500);
13                 } catch (InterruptedException e) {
14                     e.printStackTrace();
15                 }
16                 System.out.println(i);
17             }
18         };
19     }
20 
21     public static void main(String args[]) {
22         ExecutorService fixPool = Executors.newFixedThreadPool(5);
23         for (int i = 0; i < 100; i++) {
24             fixPool.execute(getThread(i));
25         }
26         fixPool.shutdown();
27     }
28 }
View Code
  1 0
  2 1
  3 2
  4 3
  5 4
  6 5
  7 6
  8 7
  9 8
 10 9
 11 10
 12 11
 13 12
 14 13
 15 14
 16 15
 17 16
 18 17
 19 19
 20 18
 21 20
 22 21
 23 22
 24 23
 25 24
 26 25
 27 26
 28 27
 29 28
 30 29
 31 33
 32 32
 33 31
 34 30
 35 34
 36 35
 37 36
 38 37
 39 38
 40 39
 41 40
 42 42
 43 41
 44 43
 45 44
 46 45
 47 47
 48 46
 49 49
 50 48
 51 50
 52 51
 53 52
 54 54
 55 53
 56 55
 57 56
 58 57
 59 58
 60 59
 61 62
 62 60
 63 61
 64 63
 65 64
 66 65
 67 66
 68 67
 69 69
 70 68
 71 70
 72 71
 73 72
 74 74
 75 73
 76 75
 77 76
 78 77
 79 79
 80 78
 81 80
 82 82
 83 81
 84 83
 85 84
 86 85
 87 87
 88 86
 89 89
 90 88
 91 90
 92 92
 93 91
 94 94
 95 93
 96 95
 97 96
 98 97
 99 99
100 98
結果

 3.二、newSingleThreadExecutor

    單個線程線程池,只有一個線程的線程池,阻塞隊列使用的是LinkedBlockingQueue,如有多餘的任務提交到線程池中,則會被暫存到阻塞隊列,待空閒時再去執行。按照先入先出的順序執行任務。

1 public static ExecutorService newSingleThreadExecutor() {
2         return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()));
3 }
4 public static ExecutorService newSingleThreadExecutor(ThreadFactory var0) {
5         return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), var0));
6 }
 1 package com.threadpool.test;
 2 
 3 import java.util.concurrent.ExecutorService;
 4 import java.util.concurrent.Executors;
 5 
 6 
 7 public class NewSingleThreadExecutorTest {
 8     private static Runnable getThread(final int i){
 9         return new Runnable() {
10             public void run() {
11                 try {
12 
13                     Thread.sleep(500);
14                 } catch (InterruptedException e) {
15                     e.printStackTrace();
16                 }
17                 System.out.println(i);
18             }
19         };
20     }
21 
22     public static void main(String args[]) throws InterruptedException {
23         ExecutorService singPool = Executors.newSingleThreadExecutor();
24         for (int i=0;i<100;i++){
25             singPool.execute(getThread(i));
26         }
27         singPool.shutdown();
28     }
29 }
View Code
  1 0
  2 1
  3 2
  4 3
  5 4
  6 5
  7 6
  8 7
  9 8
 10 9
 11 10
 12 11
 13 12
 14 13
 15 14
 16 15
 17 16
 18 17
 19 18
 20 19
 21 20
 22 21
 23 22
 24 23
 25 24
 26 25
 27 26
 28 27
 29 28
 30 29
 31 30
 32 31
 33 32
 34 33
 35 34
 36 35
 37 36
 38 37
 39 38
 40 39
 41 40
 42 41
 43 42
 44 43
 45 44
 46 45
 47 46
 48 47
 49 48
 50 49
 51 50
 52 51
 53 52
 54 53
 55 54
 56 55
 57 56
 58 57
 59 58
 60 59
 61 60
 62 61
 63 62
 64 63
 65 64
 66 65
 67 66
 68 67
 69 68
 70 69
 71 70
 72 71
 73 72
 74 73
 75 74
 76 75
 77 76
 78 77
 79 78
 80 79
 81 80
 82 81
 83 82
 84 83
 85 84
 86 85
 87 86
 88 87
 89 88
 90 89
 91 90
 92 91
 93 92
 94 93
 95 94
 96 95
 97 96
 98 97
 99 98
100 99
結果

 3.三、newCachedThreadPool

    緩存線程池,緩存的線程默認存活60秒。線程的核心池corePoolSize大小爲0,核心池最大爲Integer.MAX_VALUE,阻塞隊列使用的是SynchronousQueue。是一個直接提交的阻塞隊列,總會迫使線程池增長新的線程去執行新的任務。在沒有任務執行時,當線程的空閒時間超過keepAliveTime(60秒),則工做線程將會終止被回收,當提交新任務時,若是沒有空閒線程,則建立新線程執行任務,會致使必定的系統開銷。若是同時又大量任務被提交,並且任務執行的時間不是特別快,那麼線程池便會新增出等量的線程池處理任務,這極可能會很快耗盡系統的資源。

1 public static ExecutorService newCachedThreadPool() {
2         return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue());
3 }
4 public static ExecutorService newCachedThreadPool(ThreadFactory var0) {
5         return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue(), var0);
6 }
 1 package com.threadpool.test;
 2 
 3 import java.util.concurrent.ExecutorService;
 4 import java.util.concurrent.Executors;
 5 
 6 public class NewCachedThreadPoolTest {
 7     private static Runnable getThread(final int i){
 8         return new Runnable() {
 9             public void run() {
10                 try {
11                     Thread.sleep(1000);
12                 }catch (Exception e){
13 
14                 }
15                 System.out.println(i);
16             }
17         };
18     }
19 
20     public static  void main(String args[]){
21         ExecutorService cachePool = Executors.newCachedThreadPool();
22         for (int i=1;i<=100;i++){
23             cachePool.execute(getThread(i));
24         }
25     }
26 }
View Code
  1 4
  2 3
  3 2
  4 1
  5 7
  6 6
  7 5
  8 14
  9 13
 10 12
 11 11
 12 10
 13 8
 14 15
 15 9
 16 18
 17 16
 18 17
 19 19
 20 24
 21 23
 22 22
 23 21
 24 20
 25 30
 26 29
 27 28
 28 27
 29 26
 30 25
 31 37
 32 36
 33 35
 34 34
 35 33
 36 32
 37 31
 38 39
 39 38
 40 40
 41 44
 42 45
 43 46
 44 43
 45 42
 46 41
 47 100
 48 98
 49 99
 50 97
 51 95
 52 96
 53 85
 54 86
 55 84
 56 87
 57 88
 58 89
 59 90
 60 91
 61 92
 62 93
 63 94
 64 71
 65 72
 66 73
 67 74
 68 75
 69 76
 70 77
 71 78
 72 79
 73 80
 74 81
 75 82
 76 83
 77 63
 78 64
 79 65
 80 66
 81 67
 82 69
 83 56
 84 58
 85 68
 86 60
 87 59
 88 61
 89 57
 90 62
 91 70
 92 50
 93 52
 94 54
 95 49
 96 47
 97 55
 98 53
 99 51
100 48
結果

 3.四、newScheduledThreadPool

    定時線程池,該線程池可用於週期性地去執行任務,一般用於週期性的同步數據。scheduleAtFixedRate:是以固定的頻率去執行任務,週期是指每次執行任務成功執行之間的間隔。schedultWithFixedDelay:是以固定的延時去執行任務,延時是指上一次執行成功以後和下一次開始執行的以前的時間。

public static ScheduledExecutorService newScheduledThreadPool(int var0) {
        return new ScheduledThreadPoolExecutor(var0);
}

public static ScheduledExecutorService newScheduledThreadPool(int var0, ThreadFactory var1) {
        return new ScheduledThreadPoolExecutor(var0, var1);
}
 1 package com.threadpool.test;
 2 
 3 import java.util.concurrent.Executors;
 4 import java.util.concurrent.ScheduledExecutorService;
 5 import java.util.concurrent.TimeUnit;
 6 
 7 public class NewScheduledThreadPoolTest {
 8     public static void main(String args[]) {
 9 
10         ScheduledExecutorService ses = Executors.newScheduledThreadPool(10);
11         ses.scheduleAtFixedRate(new Runnable() {
12             public void run() {
13                 try {
14                     Thread.sleep(4000);
15                     System.out.println(Thread.currentThread().getId() + "執行了");
16                 } catch (InterruptedException e) {
17                     e.printStackTrace();
18                 }
19             }
20         }, 0, 2, TimeUnit.SECONDS);
21     }
22 }
View Code
 1 9執行了
 2 9執行了
 3 11執行了
 4 9執行了
 5 12執行了
 6 11執行了
 7 13執行了
 8 9執行了
 9 14執行了
10 12執行了
11 15執行了
12 11執行了
13 16執行了
14 13執行了
15 17執行了
16 9執行了
17 18執行了
18 14執行了
19 19執行了
20 12執行了
21 15執行了
22 11執行了
23 16執行了
24 13執行了
25 17執行了
26 17執行了
27 18執行了
28 。。。。
結果

4、總結

  線程池的概念本質上就是複用的思惟,線程複用,從而用來節省內存和CPU資源,對咱們的編程具備着重要的指導意義。

參考文獻: http://www.javashuo.com/article/p-zpxgejsk-a.html

                  http://www.javashuo.com/article/p-nmhszvtq-mc.html

相關文章
相關標籤/搜索