package sample; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; /* * * Executors 類使用 ExecutorService 提供了一個 ThreadPoolExecutor 的簡單實現, * 但 ThreadPoolExecutor 提供的功能遠不止這些。 * 咱們能夠指定建立 ThreadPoolExecutor 實例時活躍的線程數,而且能夠限制線程池的大小, * 還能夠建立本身的 RejectedExecutionHandler 實現來處理不適合放在工做隊列裏的任務。 * */ public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler { /** * ThreadPoolExecutor 提供了一些方法,能夠查看執行狀態、線程池大小、 * 活動線程數和任務數。因此,我經過一個監視線程在固定間隔輸出執行信息。 */ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println(r.toString() + " is rejected"); } }
package sample; import java.util.concurrent.ThreadPoolExecutor; public class MonitorThread implements Runnable { private ThreadPoolExecutor executor; private int seconds; private boolean run = true; public MonitorThread(ThreadPoolExecutor executor, int delay) { this.executor = executor; this.seconds = delay; } public void shutdown() { this.run = false; } public void run() { while (run) { System.out .println(String .format( "[monitor] [%d/%d] Active: %d, Completed: %d, Task: %d, isShutdown: %s, isTerminated: %s", this.executor.getPoolSize(), this.executor .getCorePoolSize(), this.executor .getActiveCount(), this.executor .getCompletedTaskCount(), this.executor.getTaskCount(), this.executor .isShutdown(), this.executor .isTerminated())); try { Thread.sleep(seconds * 1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }
package sample; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class WorkerPoolAdvanced { public static void main(String args[]) throws InterruptedException{ //RejectedExecutionHandler implementation RejectedExecutionHandler rejectionHandler = new RejectedExecutionHandlerImpl(); //Get the ThreadFactory implementation to use ThreadFactory threadFactory = Executors.defaultThreadFactory(); //creating the ThreadPoolExecutor /** * 請注意:在初始化 ThreadPoolExecutor 時,初始線程池大小設爲二、最大值設爲四、工做隊列大小設爲2。 * 因此,若是當前有4個任務正在運行而此時又有新任務提交, * 工做隊列將只存儲2個任務和其餘任務將交由RejectedExecutionHandlerImpl 處理。 */ ThreadPoolExecutor executorPool = new ThreadPoolExecutor(2, 4, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2), threadFactory, rejectionHandler); //start the monitoring thread MonitorThread monitor = new MonitorThread(executorPool, 3); Thread monitorThread = new Thread(monitor); monitorThread.start(); //submit work to the thread pool for (int i = 1; i < 11; i++) { executorPool.execute(new WorkerThread("Worker Thread No: " + i)); } Thread.sleep(30000); //shut down the pool executorPool.shutdown(); //shut down the monitor thread Thread.sleep(5000); monitor.shutdown(); } }