public void execute(Runnable command) { if (command == null) throw new NullPointerException(); //poolSize大於等於corePoolSize時不增長線程,反之新初始化線程 if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { //線程執行狀態外爲執行,同時能夠添加到隊列中 if (runState == RUNNING && workQueue.offer(command)) { if (runState != RUNNING || poolSize == 0) ensureQueuedTaskHandled(command); } //poolSize大於等於corePoolSize時,新初始化線程 else if (!addIfUnderMaximumPoolSize(command)) //沒法添加初始化執行線程,怎麼執行reject操做(調用RejectedExecutionHandler) reject(command); // is shutdown or saturated } }
private final class Worker implements Runnable { /** * Runs a single task between before/after methods. */ private void runTask(Runnable task) { final ReentrantLock runLock = this.runLock; runLock.lock(); try { /* * If pool is stopping ensure thread is interrupted; * if not, ensure thread is not interrupted. This requires * a double-check of state in case the interrupt was * cleared concurrently with a shutdownNow -- if so, * the interrupt is re-enabled. */ //當線程池的執行狀態爲關閉等,則執行當前線程的interrupt()操做 if ((runState >= STOP || (Thread.interrupted() && runState >= STOP)) && hasRun) thread.interrupt(); /* * Track execution state to ensure that afterExecute * is called only if task completed or threw * exception. Otherwise, the caught runtime exception * will have been thrown by afterExecute itself, in * which case we don't want to call it again. */ boolean ran = false; beforeExecute(thread, task); try { //任務執行 task.run(); ran = true; afterExecute(task, null); ++completedTasks; } catch (RuntimeException ex) { if (!ran) afterExecute(task, ex); throw ex; } } finally { runLock.unlock(); } } /** * Main run loop */ public void run() { try { hasRun = true; Runnable task = firstTask; firstTask = null; //判斷是否存在須要執行的任務 while (task != null || (task = getTask()) != null) { runTask(task); task = null; } } finally { //若是沒有,則將工做線程移除,當poolSize爲0是則嘗試關閉線程池 workerDone(this); } } } /* Utilities for worker thread control */ /** * Gets the next task for a worker thread to run. The general * approach is similar to execute() in that worker threads trying * to get a task to run do so on the basis of prevailing state * accessed outside of locks. This may cause them to choose the * "wrong" action, such as trying to exit because no tasks * appear to be available, or entering a take when the pool is in * the process of being shut down. These potential problems are * countered by (1) rechecking pool state (in workerCanExit) * before giving up, and (2) interrupting other workers upon * shutdown, so they can recheck state. All other user-based state * changes (to allowCoreThreadTimeOut etc) are OK even when * performed asynchronously wrt getTask. * * @return the task */ Runnable getTask() { for (;;) { try { int state = runState; if (state > SHUTDOWN) return null; Runnable r; if (state == SHUTDOWN) // Help drain queue r = workQueue.poll(); //當線程池大於corePoolSize,同時,存在執行超時時間,則等待相應時間,拿出隊列中的線程 else if (poolSize > corePoolSize || allowCoreThreadTimeOut) r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS); else //阻塞等待隊列中能夠取到新線程 r = workQueue.take(); if (r != null) return r; //判斷線程池運行狀態,若是大於corePoolSize,或者線程隊列爲空,也或者線程池爲終止的工做線程能夠銷燬 if (workerCanExit()) { if (runState >= SHUTDOWN) // Wake up others interruptIdleWorkers(); return null; } // Else retry } catch (InterruptedException ie) { // On interruption, re-check runState } } } /** * Performs bookkeeping for an exiting worker thread. * @param w the worker */ //記錄執行任務數量,將工做線程移除,當poolSize爲0是則嘗試關閉線程池 void workerDone(Worker w) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); if (--poolSize == 0) tryTerminate(); } finally { mainLock.unlock(); } }
線程保有量,線程池總永久保存執行線程的數量 java
最大線程量,線程最多不能超過此屬性設置的數量,當大於線程保有量後,會新啓動線程來知足線程執行。 緩存
獲取隊列中任務的超時時間,當閾值時間內沒法獲取線程,則會銷燬處理線程,前提是線程數量在corePoolSize 以上 多線程
執行隊列是針對任務的緩存,任務在提交至線程池時,都會壓入到執行隊列中。因此這裏你們最好設置下隊列的上限,防止溢出 app
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); }