線程池可以對線程進行統一分配,調優和監控:
- 下降資源消耗(防止線程不停的建立與銷燬,減小了資源消耗)
- 提升響應速度
- 提升線程的可管理性java
源碼內部使用了一個Integer類型的原子變量來記錄線程池狀態(高三位)和線程池線程數(其他)。數組
狀態 | 高三位 | 表現 |
---|---|---|
RUNNING | -1(111) | 接收並容許新任務 |
SHUTDOWN | 0(000) | 拒絕新任務,但處理阻塞隊列裏的任務 |
STOP | 1(001) | 拒絕新任務,放棄執行任務,並中斷正在處理的任務 |
TIDYING | 2(010) | 全部任務執行完後,當前線程池活動數爲0,將要調用 terminated 方法 |
TERMINATED | 3(011) | terminated()方法執行完畢 |
//原子變量 ctl 存儲線程池狀態(高三位)與線程數(其餘低位) private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //線程數的表示位數 private static final int COUNT_BITS = Integer.SIZE - 3; //線程最大個數 private static final int COUNT_MASK = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits //111 - 000~ private static final int RUNNING = -1 << COUNT_BITS; //000 - 000~ 拒絕新任務,但處理阻塞隊列裏的任務 private static final int SHUTDOWN = 0 << COUNT_BITS; //001 - 000~ 放棄執行任務,並中斷正在處理的任務 private static final int STOP = 1 << COUNT_BITS; //010 - 000~ 全部任務執行完後,當前線程池活動數爲0,將要調用 terminated 方法 private static final int TIDYING = 2 << COUNT_BITS; //011 - 000~ 終止狀態 private static final int TERMINATED = 3 << COUNT_BITS; //取高三位 private static int runStateOf(int c) { return c & ~COUNT_MASK; } //取低其餘位 private static int workerCountOf(int c) { return c & COUNT_MASK; } //獲取 ctl 值 private static int ctlOf(int rs, int wc) { return rs | wc; } //=========================分割線 private final ReentrantLock mainLock = new ReentrantLock(); private final HashSet<Worker> workers = new HashSet<>(); private final Condition termination = mainLock.newCondition(); private volatile boolean allowCoreThreadTimeOut; private int largestPoolSize;//歷史最大建立線程數 private long completedTaskCount;//完成的任務數 //-------------------------分割線 //構造方法相關的參數 private volatile int corePoolSize; private volatile int maximumPoolSize; private volatile long keepAliveTime; //阻塞隊列 private final BlockingQueue<Runnable> workQueue; private volatile ThreadFactory threadFactory; private volatile RejectedExecutionHandler handler;
public ThreadPoolExecutor(int corePoolSize,//核心線程數 int maximumPoolSize,//最大線程數 long keepAliveTime,//線程空閒至銷燬等待時間 TimeUnit unit,//時間單位 BlockingQueue<Runnable> workQueue,//工做隊列 ThreadFactory threadFactory,//線程工廠 RejectedExecutionHandler handler//飽和策略 ) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
拒絕策略 | 表現 |
---|---|
ThreadPoolExecutor.AbortPolicy() 默認 | 丟棄任務並拋出RejectedExecutionException異常 |
ThreadPoolExecutor.CallerRunsPolicy() | 由調用線程(提交任務的線程)處理該任務 |
ThreadPoolExecutor.DiscardOldestPolicy() | 丟棄隊列最前面的任務,而後提交當前任務 |
ThreadPoolExecutor.DiscardPolicy() | 丟棄任務,可是不拋出異常。 |
隊列名 | 表現 |
---|---|
ArrayBlockingQueue | 基於數組的有界隊列 |
LinkedBlockingQueue | 基於鏈表的無界隊列 |
SynchronousQueue | 最多隻有一個元素的隊列 |
PriorityBlockingQueue | 優先級隊列![]() |
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * 1.是否比核心線程數少,是則新建一個核心線程完成該任務 * 2.嘗試添加到阻塞隊列 * 3.嘗試新建線程完成任務,失敗則線程池已滿或關閉,執行拒絕策略 */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { //1 if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { //2 int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false))//3 reject(command); } private static boolean runStateAtLeast(int c, int s) { return c >= s; } private boolean addWorker(Runnable firstTask, boolean core) { retry: for (int c = ctl.get();;) { /* 檢查隊列是否只在必要時爲空,返回false * 1.若是線程池狀態爲 shutdown 之後的狀態 * 2.若是線程池狀態爲 shutdown 而且有了第一個任務 * 3.若是線程池狀態爲 shutdown 且任務隊列爲空 */ if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty())) return false; for (;;) { if (workerCountOf(c) >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateAtLeast(c, SHUTDOWN)) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int c = ctl.get(); //線程池正在運行 或 shutdown狀態且第一個任務爲空 if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null)) { if (t.isAlive()) // 線程處於運行態,狀態錯誤 precheck that t is startable throw new IllegalThreadStateException(); workers.add(w);//添加任務 int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start();//啓動任務 workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
方法名 | 表現 |
---|---|
public void shutdown(); | 線程池再也不接收新任務,但還會完成工做隊列裏的任務 |
public List<Runnable> shutdownNow(); | 線程池再也不接收新任務,且會放棄工做隊列的任務,正在執行的任務會被中斷,返回被丟棄任務的集合 |
public boolean awaitTermination(long timeout,TimeUnit unit) throws InterruptedException; | 當線程調用該方法後,當前線程會被阻塞,直到線程池狀態變成 TERMINATED 或超時才返回 |