ThreadPoolExecutor 基本使用參考:ThreadPoolExecutor執行過程分析
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
ctl 保存了線程池的運行狀態(runState)和線程池內有效線程數量(workerCount)。java
// Packing and unpacking ctl private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }
用 ctl 的高3位來表示線程池的運行狀態, 用低29位來表示線程池內有效線程的數量。ctlOf() 方法用於計算出ctl的值。runStateOf()和workerCountOf()方法分別經過CAPACITY來計算獲得其runState和workerCount,CAPACITY=29個1。git
線程池的運行狀態:github
// runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; //shutdown() -> SHUTDONW , 不加新任務,繼續執行阻塞隊列中的任務 private static final int SHUTDOWN = 0 << COUNT_BITS; //shutdownNow() -> STOP, 中斷一切操做。 private static final int STOP = 1 << COUNT_BITS; //線程池沒有線程,阻塞隊列沒有任務 -> TIDYING private static final int TIDYING = 2 << COUNT_BITS; //terminated() -> TERMINATED private static final int TERMINATED = 3 << COUNT_BITS;
/** * Executes the given task sometime in the future. The task * may execute in a new thread or in an existing pooled thread. * * If the task cannot be submitted for execution, either because this * executor has been shutdown or because its capacity has been reached, * the task is handled by the current {@code RejectedExecutionHandler}. * * @param command the task to execute * @throws RejectedExecutionException at discretion of * {@code RejectedExecutionHandler}, if the task * cannot be accepted for execution * @throws NullPointerException if {@code command} is null */ public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { //若是線程池中線程數沒有達到corePoolSize,則新增線程(worker) if (addWorker(command, true)) return; //更新c值。 c = ctl.get(); } //線程池處於RUNNING狀態,而且阻塞隊列未滿 //workQueue.offer(command)是非阻塞方法,當隊列滿時直接返回false(例如,SynchronousQueue若是沒有線程在阻塞take,則返回false) if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); //再次檢查狀態,若是發現不是RUNNING狀態,則remove掉剛纔offer的任務。 if (! isRunning(recheck) && remove(command)) reject(command); //若是有效線程數==0,添加一個線程,而不去啓動它。?? //怎麼會==0? else if (workerCountOf(recheck) == 0) addWorker(null, false); } //若是不是RUNNING狀態,或者阻塞隊列已滿,則添加線程 //若是不能添加,則reject。 //false 表示添加的線程屬於maximumPoolSize,若是線程數已經達到maximumPoolSize,則reject else if (!addWorker(command, false)) reject(command); }
BlockingQueue
的一些操做方法less
拋出異常 特殊值 阻塞 超時 插入 add(e)
offer(e)
put(e)
offer(e, time, unit)
移除 remove()
poll()
take()
poll(time, unit)
檢查 element()
peek()
不可用 不可用
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) //1. 處於 STOP, TYDING 或 TERMINATD 狀態 而且 //2. 不是SUHTDOWN 或者 firsttask != null 或 queue不爲空 return false; for (;;) { int wc = workerCountOf(c); //wc大於最大容量。 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) //沒有空餘的線程了。 return false; //有效線程數加一,加一成功後break if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl //runState改變,從頭執行邏輯。 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop //else runState 沒變,從新去執行加一操做。 } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //建立worker w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { //添加成功,啓動線程 //啓動後執行runWorker(this); t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
運行worker,該線程不斷的getTask()從隊列中獲取任務,而後 task.run();運行。只要隊列中有值則不斷循環。ide
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { //getTask()方法是個無限循環, 會從阻塞隊列 workQueue中不斷取出任務來執行. //addWorker(null, false);狀況,task==null,這樣就須要getTask從隊列中取任務執行(本身不帶任務)。直到getTask返回null while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { //執行 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. // STOP以上狀態,或者SHUTDOWN狀態下queue爲空,即都沒有任務要執行了。 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { //線程數減一 decrementWorkerCount(); //該線程退出。 return null; } //下面都是RUNNING狀態,或SHUTDOWN狀態queue!=null int wc = workerCountOf(c); // Are workers subject to culling? //設置了allowCoreThreadTimeOut,或者線程數大於core線程數。 //是否剔除超時的線程? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 經過返回 null 結束線程。 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); //線程已經準備好,正在take(),沒有什麼標誌位? //取出runnable 返回 if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
考慮到當線程池滿時(任務數 > maximumPoolSize + Queue.size()),會執行飽和策略。默認AbortPolicy ,拋出RejectedExecutionException。oop
怎麼能避免線程池拒絕提交的任務呢?首先想到經過信號量Semaphore來控制任務的添加。代碼以下:源碼分析
注意:該代碼是無效的。ui
Semaphore semaphore; /** * 使用semaphore,控制提交任務速度 * @throws InterruptedException * @throws ExecutionException */ @Test public void test555() throws InterruptedException, ExecutionException { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3, 7, 10, TimeUnit.SECONDS, new SynchronousQueue<>()); //信號量設置爲線程池最大線程數 semaphore = new Semaphore(threadPoolExecutor.getMaximumPoolSize()); ExecutorCompletionService<String> executorCompletionService = new ExecutorCompletionService(threadPoolExecutor); Runnable runnable = new Runnable() { @Override public void run() { for (int i = 0; i < 50; i++) { String name = "name_" + i; TestCallable testCallable = new TestCallable(name); try { //RetryUtil.createThreadPoolExecutor() semaphore.acquire(); executorCompletionService.submit(testCallable); logger.info("+++添加任務 name: " + name + poolInfo(threadPoolExecutor)); //threadPoolExecutor.submit(testCallable); } catch (RejectedExecutionException e) { logger.info("拒絕:" + name); } catch (InterruptedException e) { e.printStackTrace(); } try { //添加任務間隔200ms Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } } finishState = 1; } }; Thread addThread = new Thread(runnable); addThread.start(); //logger.info(" taskCount: " + threadPoolExecutor.getTaskCount()); //添加的任務有被拋棄的。taskCount不必定等於添加的任務。 int completeCount = 0; while (!(completeCount == threadPoolExecutor.getTaskCount() && finishState == 1)) { Future<String> take = executorCompletionService.take(); String taskName = null; try { taskName = take.get(); //有可能線程池還沒準備好? semaphore.release(); System.out.println("???" + take.isDone()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { logger.info(e.getMessage()); } logger.info("---完成任務 name: " + taskName + poolInfo(threadPoolExecutor) + " finishTask:" + (++completeCount)); } addThread.join(); while (threadPoolExecutor.getPoolSize() > 0) { Thread.sleep(1000); SimpleDateFormat simpleDateFormat = new SimpleDateFormat("HH:mm:ss"); logger.info(simpleDateFormat.format(new Date()) + poolInfo(threadPoolExecutor)); } // Tell threads to finish off. threadPoolExecutor.shutdown(); // Wait for everything to finish. while (!threadPoolExecutor.awaitTermination(10, TimeUnit.SECONDS)) { logger.info("complete"); } } public String poolInfo(ThreadPoolExecutor threadPoolExecutor) { return " ActiveCount: " + threadPoolExecutor.getActiveCount() + " poolSize: " + threadPoolExecutor.getPoolSize() + " queueSize: " + threadPoolExecutor.getQueue().size() + " taskCount: " + threadPoolExecutor.getTaskCount(); }
只是在submit以前添加semaphore.acquire(); 在獲取future後,添加semaphore.release();。this
但這樣依然會產生RejectedExecutionException。atom
經過源碼分析緣由,
當線程池中線程已滿,而且都處於忙碌狀態。此時semaphore的值==線程池線程數,addThread被semaphore.acquire()阻塞,禁止submit新任務。當線程池中一個線程t1執行了runWorker(Worker w)中的task.run(),main線程就能夠執行Future<String> take = executorCompletionService.take()獲取結果並semaphore.release()釋放信號量。
釋放信號量semaphore後,addThread線程能夠submit新任務,假設此時t1線程尚未執行到getTask() 中的poll()和take()方法。此時workQueue隊列依然是滿的。
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
而addThread已經執行到execute()的
if (isRunning(c) && workQueue.offer(command)) {
當workQueue已滿,offer() 直接返回false(正確的順序應該是等t1線程執行到workQueue.take()後addThread再開始執行workQueue.offer(command)。)。執行execute() 以下邏輯
else if (!addWorker(command, false)) reject(command);
addWork()中,wc = maximumPoolSize 返回false。
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) //沒有空餘的線程了。 return false;
執行reject(),拋出RejectedExecutionException。
public class LimitedQueue<E> extends LinkedBlockingQueue<E> { public LimitedQueue(int maxSize) { super(maxSize); } @Override public boolean offer(E e) { // turn offer() and add() into a blocking calls (unless interrupted) try { put(e); return true; } catch(InterruptedException ie) { Thread.currentThread().interrupt(); } return false; } }
其思想就是替換BlockingQueue中的offer()方法爲put()方法,這樣execute() 中的workQueue.offer(command),就變成put(),阻塞添加任務,不會存在workQueue.offer() 返回false的狀況。
//void execute(Runnable command) 中代碼 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } //一下代碼,沒法執行 else if (!addWorker(command, false)) reject(command);
但這樣的問題是下面的else if (!addWorker(command, false)) 代碼邏輯將沒法執行,致使的結果就是,只針對corePoolSize==maxPoolSize 時有效。不建議這麼作。
RejectedExecutionHandler block = new RejectedExecutionHandler() { rejectedExecution(Runnable r, ThreadPoolExecutor executor) { executor.getQueue().put( r ); } }; ThreadPoolExecutor pool = new ... pool.setRejectedExecutionHandler(block);
經過自定義RejectedExecutionHandler,在reject時調用Queue的put()方法,阻塞式添加任務。
其實忙活一圈,發現最簡單的方式就是使用ThreadPoolExecutor.CallerRunsPolicy。
CallerRunsPolicy被拒絕的任務,誰submit的誰執行。想一想以前的各類阻塞也對,負責添加任務的線程由於線程池滿了就阻塞在那裏,還不如幫着執行一些任務..