前言:
線程池,在個人理解當中,實際上是典型的消費者生產者模型
咱們實現簡單的線程池,其實不難,java的併發庫中,有咱們能夠直接拿來用的阻塞隊列使用,用來存儲任務以及消費者,而不須要咱們作額外的同步跟阻塞操做,而消費者會經過自旋的形式,不斷的從任務阻塞隊列獲取任務,若是沒有獲取到任務,則阻塞,直到有任務來進行消費,下面是代碼
-----------------------------------我是分割線-----------------------------------java
首先,咱們定義一個接口,定義下,線程池的一些簡單操做,下面是代碼併發
public interface Pool { /** * 建立人:賀小五 * 建立時間:2017-08-19 00:21:53 * 描述: * 添加任務 */ void execute(Runnable runnable); /** * 建立人:賀小五 * 建立時間:2017-08-19 00:22:03 * 描述: * 中止線程池(讓線程執行完任務在中止) */ void shutDown(); /** * 建立人:賀小五 * 建立時間:2017-08-19 00:22:39 * 描述: * 添加工做者 */ void addWorker(int num); /** * 建立人:賀小五 * 建立時間:2017-08-19 00:22:55 * 描述: * 移除工做者 */ void removeWorker(int num); /** * 建立人:賀小五 * 建立時間:2017-08-19 00:23:04 * 描述: * 線程池大小 */ int poolSize(); /** * 建立人:賀小五 * 建立時間:2017-08-19 00:23:15 * 描述: * 中止線程池(無論是否有任務,都中止) */ void shutDownNow(); }
既然定義好接口了,咱們來定義一下實現ide
public class DefaultThreadPool implements Pool{ /** * 使用java併發庫下的阻塞隊列來作,這樣咱們就不須要作額外的同步跟阻塞操做 */ private final BlockingQueue<Runnable> jobs = new LinkedBlockingQueue<>(); private final BlockingQueue<Worker> workers = new LinkedBlockingQueue<>(); /** * 建立人:賀小五 * 建立時間:2017-08-19 00:24:22 * 描述: * 初始化線程池大小 */ public DefaultThreadPool(int num) { initPool(num); } @Override public int poolSize() { return workers.size(); } private void initPool(int num){ for (int i = 0; i < num; i++) { Worker worker = new Worker(); workers.add(worker); worker.start(); } } @Override public void execute(Runnable runnable) { if(runnable!=null){ jobs.add(runnable); } } @Override public void shutDown() { /** * 經過不斷的循環來判斷,任務隊列是否已經清空, * 若是隊列任務清空了,將工做者隊列的線程中止 * 打破循環,清空工做者隊列 */ while(true){ if(jobs.size()==0){ for (Worker worker : workers) { worker.stopRunning(); } break; } } workers.clear(); } @Override public void shutDownNow() { /** * 清空任務隊列,而後調用中止線程池的方法 */ jobs.clear(); shutDown(); } @Override public void addWorker(int num){ /** * 添加新的工做者到工做者隊列尾部 */ for (int i = 0; i < num; i++) { Worker worker = new Worker(); workers.offer(worker); worker.start(); } } @Override public void removeWorker(int num) { /** * 移除工做者阻塞隊列頭部的線程 */ for (int i = 0; i < num; i++) { try { workers.take().stopRunning(); } catch (InterruptedException e) { e.printStackTrace(); } } } private class Worker extends Thread{ //經過 volatile修飾的變量,保證變量的可見性,從而能讓線程立刻得知狀態 private volatile boolean isRunning = true; @Override public void run() { //經過自旋不停的從任務隊列中獲取任務, while (isRunning){ Runnable runnable = null; try { //若是工做隊列爲空,則紫色 runnable = jobs.take(); } catch (InterruptedException e) { e.printStackTrace(); } if(runnable!=null){ System.out.print(getName()+"-->"); runnable.run(); } // 睡眠 100毫秒,驗證 shutdown 是不是在任務執行完畢後纔會關閉線程池 try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(getName()+"銷燬..."); } public void stopRunning(){ this.isRunning = false; } } }
上面接口實現很簡單,工做者經過私有內部類,繼承Thread 類來當工做者,內部定義兩個阻塞隊列,用於存儲任務跟工做者,其它線程最大數,最小數,任務隊列最大數暫時不定義了,這只是一個簡單的實現,各位看官有興趣,能夠本身進行擴展測試
接下來就是測試代碼,來驗證下接口定義的方法this
public class PoolTest { public static void main(String[] args){ //構建一個只有10個線程的線程池 Pool pool = new DefaultThreadPool(10); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } //放500個任務進去,讓線程池進行消費 for (int i = 0; i < 500; i++) { int finalI = i; pool.execute(new Runnable() { @Override public void run() { System.out.println("打印數字:"+ finalI); } }); } /** * 驗證線程池的消費完任務中止以及不等任務隊列清空就中止任務 */ System.out.println("中止線程池"); pool.shutDown(); //pool.shutDownNow(); /** * 移除2個工做者 */ //pool.removeWorker(2); //System.out.println("線程池大小:"+pool.poolSize()); /** * 添加5個工做者 */ //pool.addWorker(5); //System.out.println("線程池大小:"+pool.poolSize()); } }
到這,文章就結束了!spa
以上,均爲本人我的理解,比較簡單的理解,或許跟各位看官理解的有出入,歡迎指正交流線程
歡迎轉載,請註明出處跟做者,謝謝!code