本身手動實現一個簡單的線程池

前言:

     線程池,在個人理解當中,實際上是典型的消費者生產者模型

     咱們實現簡單的線程池,其實不難,java的併發庫中,有咱們能夠直接拿來用的阻塞隊列使用,用來存儲任務以及消費者,而不須要咱們作額外的同步跟阻塞操做,而消費者會經過自旋的形式,不斷的從任務阻塞隊列獲取任務,若是沒有獲取到任務,則阻塞,直到有任務來進行消費,下面是代碼

 

 

 

-----------------------------------我是分割線-----------------------------------java

 

首先,咱們定義一個接口,定義下,線程池的一些簡單操做,下面是代碼併發

 

public interface Pool {

	/**
	 * 建立人:賀小五
	 * 建立時間:2017-08-19 00:21:53
	 * 描述:
	 * 		添加任務
	 */
	void execute(Runnable runnable);

	/**
	 * 建立人:賀小五
	 * 建立時間:2017-08-19 00:22:03
	 * 描述:
	 * 		中止線程池(讓線程執行完任務在中止)
	 */
	void shutDown();

	/**
	 * 建立人:賀小五
	 * 建立時間:2017-08-19 00:22:39
	 * 描述:
	 * 		添加工做者
	 */
	void addWorker(int num);

	/**
	 * 建立人:賀小五
	 * 建立時間:2017-08-19 00:22:55
	 * 描述:
	 * 		移除工做者
	 */
	void removeWorker(int num);

	/**
	 * 建立人:賀小五
	 * 建立時間:2017-08-19 00:23:04
	 * 描述:
	 * 		線程池大小
	 */
	int poolSize();

	/**
	 * 建立人:賀小五
	 * 建立時間:2017-08-19 00:23:15
	 * 描述:
	 * 	     中止線程池(無論是否有任務,都中止)
	 */
	void shutDownNow();

}

 

 

既然定義好接口了,咱們來定義一下實現ide

public class DefaultThreadPool implements Pool{

	/**
	 * 使用java併發庫下的阻塞隊列來作,這樣咱們就不須要作額外的同步跟阻塞操做
	 */
	private final BlockingQueue<Runnable> jobs = new LinkedBlockingQueue<>();
	private final BlockingQueue<Worker> workers = new LinkedBlockingQueue<>();


	/**
	 * 建立人:賀小五
	 * 建立時間:2017-08-19 00:24:22
	 * 描述:
	 * 		初始化線程池大小
	 */
	public DefaultThreadPool(int num) {
		initPool(num);
	}

	@Override
	public int poolSize() {
		return workers.size();
	}


	private void initPool(int num){
		for (int i = 0; i < num; i++) {
			Worker worker = new Worker();
			workers.add(worker);
			worker.start();
		}
	}

	@Override
	public void execute(Runnable runnable) {
		if(runnable!=null){
			jobs.add(runnable);
		}
	}

	@Override
	public void shutDown() {
		/**
		 * 經過不斷的循環來判斷,任務隊列是否已經清空,
		 * 若是隊列任務清空了,將工做者隊列的線程中止
		 * 打破循環,清空工做者隊列
		 */
		while(true){
			if(jobs.size()==0){
				for (Worker worker : workers) {
					worker.stopRunning();
				}
				break;
			}
		}
		workers.clear();
	}

	@Override
	public void shutDownNow() {
		/**
		 * 清空任務隊列,而後調用中止線程池的方法
		 */
		jobs.clear();
		shutDown();
	}

	@Override
	public void addWorker(int num){
		/**
		 *  添加新的工做者到工做者隊列尾部
		 */
		for (int i = 0; i < num; i++) {
			Worker worker = new Worker();
			workers.offer(worker);
			worker.start();
		}
	}

	@Override
	public void removeWorker(int num) {
		/**
		 * 移除工做者阻塞隊列頭部的線程
		 */
		for (int i = 0; i < num; i++) {
			try {
				workers.take().stopRunning();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}



	private class Worker extends Thread{

		//經過 volatile修飾的變量,保證變量的可見性,從而能讓線程立刻得知狀態
		private volatile boolean isRunning = true;


		@Override
		public void run() {
			//經過自旋不停的從任務隊列中獲取任務,
			while (isRunning){
				Runnable runnable = null;

				try {
					//若是工做隊列爲空,則紫色
					runnable = jobs.take();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}

				if(runnable!=null){
					System.out.print(getName()+"-->");
					runnable.run();
				}
				// 睡眠 100毫秒,驗證 shutdown 是不是在任務執行完畢後纔會關閉線程池
				try {
					Thread.sleep(100);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			System.out.println(getName()+"銷燬...");
		}

		public void stopRunning(){
			this.isRunning = false;
		}
	}
}

 

 

上面接口實現很簡單,工做者經過私有內部類,繼承Thread 類來當工做者,內部定義兩個阻塞隊列,用於存儲任務跟工做者,其它線程最大數,最小數,任務隊列最大數暫時不定義了,這只是一個簡單的實現,各位看官有興趣,能夠本身進行擴展測試

 

 

接下來就是測試代碼,來驗證下接口定義的方法this

public class PoolTest {
	public static void main(String[] args){
        //構建一個只有10個線程的線程池
		Pool pool = new DefaultThreadPool(10);

		try {
			Thread.sleep(1000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

		//放500個任務進去,讓線程池進行消費
		for (int i = 0; i < 500; i++) {
			int finalI = i;
			pool.execute(new Runnable() {
				@Override
				public void run() {
					System.out.println("打印數字:"+ finalI);
				}
			});
		}

		/**
		 * 驗證線程池的消費完任務中止以及不等任務隊列清空就中止任務
		 */
		System.out.println("中止線程池");
		pool.shutDown();
		//pool.shutDownNow();

		/**
		 * 移除2個工做者
		 */
		//pool.removeWorker(2);
		//System.out.println("線程池大小:"+pool.poolSize());

		/**
		 * 添加5個工做者
		 */
		//pool.addWorker(5);
		//System.out.println("線程池大小:"+pool.poolSize());

	}
}

 

 

 

以上,就是本身動手實現的一個簡單線程池,經過自旋鎖,讓工做線程不停的從任務隊列獲取任務,很是簡單的實現,若是不想使用java提供的阻塞隊列,想本身作,可使用 java關鍵字 synchronize 配合 wait() notify()方法來實現或者是 lock接口配合 condition的 await()跟signal() 方法來實現一個簡單的阻塞隊列

 

 

 

到這,文章就結束了!spa

以上,均爲本人我的理解,比較簡單的理解,或許跟各位看官理解的有出入,歡迎指正交流線程

歡迎轉載,請註明出處跟做者,謝謝!code

相關文章
相關標籤/搜索