關於ThreadPoolExecutor 調用RejectedExecutionHandler的機制

        當咱們建立線程池而且提交任務失敗時,線程池會回調RejectedExecutionHandler接口的rejectedExecution(Runnable task, ThreadPoolExecutor executor)方法來處理線程池處理失敗的任務,其中task 是用戶提交的任務,而executor是當前執行的任務的線程池。能夠經過代碼的方式來驗證。 java

一、線程池工廠: ide


package com.threadpool;

import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * 線程池工廠方法
 * @author 
 *
 */
public class ThreadPoolFactory {
	
	//線程池
	private static ThreadPoolExecutor  pool;	
	//自身對象
	private static ThreadPoolFactory factory;
	
	/**
	 * 私有構造函數
	 */
	private ThreadPoolFactory(){	}
	
	/**
	 * 獲取工廠對象
	 * @param config
	 * @return
	 */
	public static ThreadPoolFactory getInstance(ThreadPoolConfig config){
		if(factory == null){
			factory = new ThreadPoolFactory();
		}
		
		if(pool == null){
			
			if(config.getHandler() == null){
				pool = new ThreadPoolExecutor(config.getCorePoolSize(),
						config.getMaximumPoolSize(),config.getKeepAliveTime(),
						config.getUnit(),config.getWorkQueue());
			}else{
				pool = new ThreadPoolExecutor(config.getCorePoolSize(),
						config.getMaximumPoolSize(),config.getKeepAliveTime(),
						config.getUnit(),config.getWorkQueue(),config.getHandler());
			}
		}		
		System.out.println("pool  create= "+pool.toString());
		return factory;
	}
	
	/**
	 * 添加線程池任務
	 * @param run
	 */
	public synchronized void addTask(Runnable run){
		pool.execute(run);
	}
	
	/**
	 * 添加線程池任務
	 * @param runs
	 */
	public synchronized void addTask(List<Runnable> runs){
		if(runs != null){
			for(Runnable r:runs){
				this.addTask(r);
			}
		}
	}
	
	/**
	 * 關閉線程池
	 */
	public void closePool(){
		pool.shutdown();
	}
	
}
二、線程池配置文件類:



package com.threadpool;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.TimeUnit;

/**
 * 線程池配置類
 * @author 
 *
 */
public class ThreadPoolConfig {
	//池中所保存的線程數,包括空閒線程。
	private int corePoolSize;
	//池中容許的最大線程數。
	private int maximumPoolSize;
	//當線程數大於核心時,此爲終止前多餘的空閒線程等待新任務的最長時間。
	private long keepAliveTime; 
	//參數的時間單位。
	private TimeUnit unit;
	//執行前用於保持任務的隊列。此隊列僅由保持 execute 方法提交的 Runnable 任務。
	private BlockingQueue<Runnable> workQueue;
	//因爲超出線程範圍和隊列容量而使執行被阻塞時所使用的處理程序。 
	private RejectedExecutionHandler handler;
	//配置文件自身對象
	private static ThreadPoolConfig config;
	/**
	 * 單例模式
	 */
	private ThreadPoolConfig(){
		
	}
	
	/**
	 * 獲取配置文件對象
	 * @return
	 */
	public static ThreadPoolConfig getInstance(){
		if(config == null){
			config = new ThreadPoolConfig();
		}		
		return config;
	}	
	public int getCorePoolSize() {
		return corePoolSize;
	}
	public void setCorePoolSize(int corePoolSize) {
		this.corePoolSize = corePoolSize;
	}
	public int getMaximumPoolSize() {
		return maximumPoolSize;
	}
	public void setMaximumPoolSize(int maximumPoolSize) {
		this.maximumPoolSize = maximumPoolSize;
	}
	public long getKeepAliveTime() {
		return keepAliveTime;
	}
	public void setKeepAliveTime(long keepAliveTime) {
		this.keepAliveTime = keepAliveTime;
	}
	public TimeUnit getUnit() {
		return unit;
	}
	public void setUnit(TimeUnit unit) {
		this.unit = unit;
	}
	public BlockingQueue<Runnable> getWorkQueue() {
		return workQueue;
	}
	public void setWorkQueue(BlockingQueue<Runnable> workQueue) {
		this.workQueue = workQueue;
	}
	public RejectedExecutionHandler getHandler() {
		return handler;
	}
	public void setHandler(RejectedExecutionHandler handler) {
		this.handler = handler;
	}	
}
三、簡單任務類:



package com.test;

/**
 * 任務線程
 * @author 
 *
 */
public class ThreadTask extends Thread {
	
	public ThreadTask(String name){
		super(name);
	}
	
	@SuppressWarnings("static-access")
	@Override
	public void run() {
		// TODO Auto-generated method stub
		System.out.println(this.getName().toString() + ", will sleep 0 s");
		try {
			this.sleep(1*10);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		System.out.println(this.getName().toString() + ", I am wakeup now ");
	}

}

四、異常處理接口實現類: 函數

package com.threadpool;

import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * 線程池異常處理類
 * @author 
 *
 */
public class MyRejectedExecutionHandler implements RejectedExecutionHandler {

	@Override
	public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {
		// TODO Auto-generated method stub
		System.out.println("Begin exception handler-----------");
		//執行失敗任務
		new Thread(task,"exception by pool").start();
		//打印線程池的對象
		System.out.println("The pool RejectedExecutionHandler = "+executor.toString());
	}
}
五、測試主函數:
package com.test;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;

import com.threadpool.MyRejectedExecutionHandler;
import com.threadpool.ThreadPoolConfig;
import com.threadpool.ThreadPoolFactory;

/**
 * @author 
 *
 */
public class TestThreadPoolMain {

	/**
	 * @param args
	 */
	public static void main(String[] args) {

		//設置配置
		ThreadPoolConfig config = ThreadPoolConfig.getInstance();
		config.setCorePoolSize(2);
		config.setMaximumPoolSize(3);
		config.setKeepAliveTime(5);
		config.setUnit(TimeUnit.SECONDS);
		//將隊列設小,會拋異常
		config.setWorkQueue(new ArrayBlockingQueue<Runnable>(10));
		config.setHandler(new MyRejectedExecutionHandler());
		//線程池工廠
		ThreadPoolFactory factory = ThreadPoolFactory.getInstance(config);
		
		for(int i = 0;i<100;i++){
			factory.addTask(new ThreadTask(i+"-i"));
		}
		System.out.println("i add is over!-------------------");
	}
}
六、測試比較:

能夠看出建立的線程池對象和調用傳遞的線程池對象是相同的。 測試

pool create = java.util.concurrent.ThreadPoolExecutor@de6f34
0-i, will sleep 0 s
Begin exception handler-----------
12-i, will sleep 0 s
The pool RejectedExecutionHandler = java.util.concurrent.ThreadPoolExecutor@de6f34
Begin exception handler-----------
1-i, will sleep 0 s
The pool RejectedExecutionHandler = java.util.concurrent.ThreadPoolExecutor@de6f34 this

相關文章
相關標籤/搜索