線程池(如線程數超出線程池容量,返回消息提示用戶)

package sample;

import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

/*
 * 
 * Executors 類使用 ExecutorService  提供了一個 ThreadPoolExecutor 的簡單實現,
 * 但 ThreadPoolExecutor 提供的功能遠不止這些。
 * 咱們能夠指定建立 ThreadPoolExecutor 實例時活躍的線程數,而且能夠限制線程池的大小,
 * 還能夠建立本身的 RejectedExecutionHandler 實現來處理不適合放在工做隊列裏的任務。
 * 
 */

public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {
    /**
     * ThreadPoolExecutor 提供了一些方法,能夠查看執行狀態、線程池大小、
     * 活動線程數和任務數。因此,我經過一個監視線程在固定間隔輸出執行信息。
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println(r.toString() + " is rejected");
    }

}
package sample;

import java.util.concurrent.ThreadPoolExecutor;

public class MonitorThread implements Runnable {
    private ThreadPoolExecutor executor;

    private int seconds;

    private boolean run = true;

    public MonitorThread(ThreadPoolExecutor executor, int delay) {
        this.executor = executor;
        this.seconds = delay;
    }

    public void shutdown() {
        this.run = false;
    }

    public void run() {
        while (run) {
            System.out
                    .println(String
                            .format(
                                    "[monitor] [%d/%d] Active: %d, Completed: %d, Task: %d, isShutdown: %s, isTerminated: %s",
                                    this.executor.getPoolSize(), this.executor
                                            .getCorePoolSize(), this.executor
                                            .getActiveCount(), this.executor
                                            .getCompletedTaskCount(),
                                    this.executor.getTaskCount(), this.executor
                                            .isShutdown(), this.executor
                                            .isTerminated()));
            try {
                Thread.sleep(seconds * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }
}
package sample;
 
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
 
public class WorkerPoolAdvanced {
 
    public static void main(String args[]) throws InterruptedException{
        //RejectedExecutionHandler implementation
        RejectedExecutionHandler rejectionHandler = new RejectedExecutionHandlerImpl();
        //Get the ThreadFactory implementation to use
        ThreadFactory threadFactory = Executors.defaultThreadFactory();
        //creating the ThreadPoolExecutor
        /**
         * 請注意:在初始化 ThreadPoolExecutor 時,初始線程池大小設爲二、最大值設爲四、工做隊列大小設爲2。
         * 因此,若是當前有4個任務正在運行而此時又有新任務提交,
         * 工做隊列將只存儲2個任務和其餘任務將交由RejectedExecutionHandlerImpl 處理。
         */
        ThreadPoolExecutor executorPool = new ThreadPoolExecutor(2, 4, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2), threadFactory, rejectionHandler);
        //start the monitoring thread
        MonitorThread monitor = new MonitorThread(executorPool, 3);
        Thread monitorThread = new Thread(monitor);
        monitorThread.start();
        //submit work to the thread pool
        for (int i = 1; i < 11; i++) {
            executorPool.execute(new WorkerThread("Worker Thread No: " + i));
        }
 
        Thread.sleep(30000);
        //shut down the pool
        executorPool.shutdown();
        //shut down the monitor thread
        Thread.sleep(5000);
        monitor.shutdown();
 
    }
}
相關文章
相關標籤/搜索