初版地址:https://blog.csdn.net/wandou9527/article/details/107769598java
第二版優化點:多線程
java.util.concurrent.Executor
接口,更加符合規範優化後又離完美線程池近了一步。併發
若是想玩轉 Java 的多線程與高併發,線程池是你永遠也繞不過的山。既然繞不過,咱們就啃他,吃透線程池,玩轉高併發。
閱讀Jdk線程池源碼發現,Jdk裏的線程池實現的很是完善,有不少複雜的邏輯處理,因此形成代碼較長,並且代碼格式也不規範(ps. 請原諒我指點江山,人家可能有人家的實際緣由),eg. if 後沒有大括號;不少變量命名都是單字母,好比c、w等。。。ide
本文,精簡了線程池的一些複雜邏輯,從主幹功能出發,實現主幹功能,我相信更有助於咱們理解線程池,而後再一步步深刻。高併發
private volatile int corePoolSize; //核心線程數 private volatile int maximumPoolSize; //最大線程數 private volatile long keepAliveTime; //存活時間 private final BlockingQueue<Runnable> workQueue; //任務等待隊列 private volatile ThreadFactory threadFactory; //線程工廠 private volatile RejectedExecutionHandler handler; //拒絕策略
下面介紹一下線程池執行任務的流程,理解各個屬性的意義。當一個線程池初始化,向線程池提交任務,線程池新建線程執行任務,隨着線程建立,線程數逐漸增多,當達到 corePoolSize
線程池將再也不新建線程,而是將任務放入任務等待隊列 workQueue
。再持續向線程池提交任務,當等待隊列滿了,這時會繼續新建線程,直到到達最大線程數 maximumPoolSize
,若是還繼續有任務到來,線程池沒法處理,這時就啓動拒絕策略。 post
這個過程咱們能夠以生活中的例子比喻一下。大體咱們把線程池理解爲理髮店。那麼流程就是:來了顧客開始理髮,好比只有4個理髮師4個座位,至關於核心線程。那麼來了過多的顧客,理髮師忙不過來就會先讓你去等候區稍等排隊等待,前面有理完髮的會叫你,至關於等待隊列。等待區滿了呢?現實中理髮店確定不會拒絕顧客的啊,他可能讓你先在外面等。但若是等待區天天都爆滿,那麼老闆可能會考慮擴大店面,擴充理髮師團隊了。因此,這只是個大體的比喻。測試
廢話不說,咱們上代碼。優化
package com.wandou.demo.thread.post.threapool; import java.util.HashSet; import java.util.Objects; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicInteger; /** * @author liming * @date 2020/09 * @description 自定義線程池 * new功能 * - 線程延遲建立(來任務才建立) * 欠缺功能: * - 隊列滿了後繼續建立線程,直到達到最大線程數 * - 拒絕策略 * - 線程超時銷燬 */ public class MyThreadPool implements Executor { /** * 核心線程數(核心理髮師數量) */ private volatile int corePoolSize; /** * 任務等待隊列(等待區座位數) */ private final BlockingQueue<Runnable> workQueue; /** * 線程容器(理髮師做業區) */ private final HashSet<MyThreadPool.Worker> workers = new HashSet<>(); private final AtomicInteger workerCount = new AtomicInteger(0); public MyThreadPool(int corePoolSize, BlockingQueue<Runnable> workQueue) { this.corePoolSize = corePoolSize; this.workQueue = workQueue; } /** * 運行,來顧客了,安排 * * @param command * @return */ @Override public void execute(Runnable command) { if (command == null) { throw new NullPointerException(); } int count = workerCount.get(); // 若是小於核心線程數,能夠創建新線程 if (count < corePoolSize) { if (addWorker(command, true)) { return; } } if (workQueue.offer(command)) { int recheck = workerCount.get(); if (recheck == 0) { addWorker(null, false); } return; } //拒絕 throw new RejectedExecutionException("Task " + command.toString() + " rejected from " + this.toString()); } private boolean addWorker(Runnable command, boolean core) { for (; ; ) { int count = workerCount.get(); if (count >= corePoolSize) { return false; } // 線程數+1, 成功向下走 if (workerCount.compareAndSet(count, count + 1)) { break; } } Worker worker = new Worker(command); final Thread thread = worker.thread; workers.add(worker); thread.start(); return true; } // ---------------- /** * 線程(理髮師) */ private class Worker implements Runnable { /** * 工做者運行的線程 */ final Thread thread; /** * 初始運行的任務,可能爲 null */ Runnable firstTask; /** * 建立一個工做者 * * @param firstTask 第一個任務,能夠爲 null */ Worker(Runnable firstTask) { this.firstTask = firstTask; this.thread = new Thread(this); } @Override public void run() { System.out.println("run"); runWorker(this); } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Worker worker = (Worker) o; return Objects.equals(thread, worker.thread) && Objects.equals(firstTask, worker.firstTask); } @Override public int hashCode() { return Objects.hash(thread, firstTask); } } final void runWorker(Worker worker) { Runnable task = worker.firstTask; worker.firstTask = null; while (task != null || (task = getTask()) != null) { try { task.run(); } catch (Exception e) { e.printStackTrace(); } finally { // 運行完畢 task = null; } } } private Runnable getTask() { for (; ; ) { try { //阻塞拿任務 return workQueue.take(); } catch (InterruptedException e) { System.out.println("InterruptedException!!!"); } } } }
測試代碼:this
package com.wandou.demo.thread.post.threapool; import org.junit.Test; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; /** * @author liming * @date 2020/9/17 * @description */ public class MyThreadPoolDemo { private BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(); private MyThreadPool myThreadPool = new MyThreadPool(3, workQueue); @Test public void t1() throws Exception { AtomicInteger atomicInteger = new AtomicInteger(0); Runnable task = new Runnable() { @Override public void run() { try { System.out.println(atomicInteger.incrementAndGet() + "號顧客來理髮,爲其理髮的理髮師是:" + Thread.currentThread().getName()); } catch (Exception e) { e.printStackTrace(); } } }; for (int i = 0; i < 30; i++) { myThreadPool.execute(task); } Thread.sleep(5000); System.out.println("================================================"); for (int i = 0; i < 30; i++) { myThreadPool.execute(task); } //讓主線程阻塞等待 System.in.read(); } }
測試結果:atom