一個來自一線程序員對Java線程池的理解 手寫線程池 帶你翻過線程池這座山

初版地址:https://blog.csdn.net/wandou9527/article/details/107769598java

第二版優化點:多線程

  • 線程實現延遲建立,對內存更友好
  • 自定義線程池實現java.util.concurrent.Executor 接口,更加符合規範
  • submit方法改成execute方法,語義更貼切
  • 加入對任務(command)的非空判斷

優化後又離完美線程池近了一步。併發

若是想玩轉 Java 的多線程與高併發,線程池是你永遠也繞不過的山。既然繞不過,咱們就啃他,吃透線程池,玩轉高併發。
閱讀Jdk線程池源碼發現,Jdk裏的線程池實現的很是完善,有不少複雜的邏輯處理,因此形成代碼較長,並且代碼格式也不規範(ps. 請原諒我指點江山,人家可能有人家的實際緣由),eg. if 後沒有大括號;不少變量命名都是單字母,好比c、w等。。。ide

本文,精簡了線程池的一些複雜邏輯,從主幹功能出發,實現主幹功能,我相信更有助於咱們理解線程池,而後再一步步深刻。高併發

Jdk裏的線程池

主要屬性

private volatile int corePoolSize; //核心線程數
private volatile int maximumPoolSize; //最大線程數
private volatile long keepAliveTime; //存活時間
private final BlockingQueue<Runnable> workQueue; //任務等待隊列
private volatile ThreadFactory threadFactory; //線程工廠
private volatile RejectedExecutionHandler handler; //拒絕策略

下面介紹一下線程池執行任務的流程,理解各個屬性的意義。當一個線程池初始化,向線程池提交任務,線程池新建線程執行任務,隨着線程建立,線程數逐漸增多,當達到 corePoolSize 線程池將再也不新建線程,而是將任務放入任務等待隊列 workQueue 。再持續向線程池提交任務,當等待隊列滿了,這時會繼續新建線程,直到到達最大線程數 maximumPoolSize,若是還繼續有任務到來,線程池沒法處理,這時就啓動拒絕策略。 post

這個過程咱們能夠以生活中的例子比喻一下。大體咱們把線程池理解爲理髮店。那麼流程就是:來了顧客開始理髮,好比只有4個理髮師4個座位,至關於核心線程。那麼來了過多的顧客,理髮師忙不過來就會先讓你去等候區稍等排隊等待,前面有理完髮的會叫你,至關於等待隊列。等待區滿了呢?現實中理髮店確定不會拒絕顧客的啊,他可能讓你先在外面等。但若是等待區天天都爆滿,那麼老闆可能會考慮擴大店面,擴充理髮師團隊了。因此,這只是個大體的比喻。測試

自定義手寫線程池

廢話不說,咱們上代碼。優化

package com.wandou.demo.thread.post.threapool;

import java.util.HashSet;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author liming
 * @date 2020/09
 * @description 自定義線程池
 * new功能 
 * - 線程延遲建立(來任務才建立) 
 * 欠缺功能: 
 * - 隊列滿了後繼續建立線程,直到達到最大線程數 
 * - 拒絕策略 
 * - 線程超時銷燬 
 */
 
public class MyThreadPool implements Executor {
    /**
     * 核心線程數(核心理髮師數量) 
     */ 
    private volatile int corePoolSize;
    /**
    * 任務等待隊列(等待區座位數) 
    */ 
    private final BlockingQueue<Runnable> workQueue;
    /**
    * 線程容器(理髮師做業區) 
    */ 
    private final HashSet<MyThreadPool.Worker> workers = new HashSet<>();
 
    private final AtomicInteger workerCount = new AtomicInteger(0);
 
    public MyThreadPool(int corePoolSize, BlockingQueue<Runnable> workQueue) {
        this.corePoolSize = corePoolSize;
        this.workQueue = workQueue;
    }
    
   /**
    * 運行,來顧客了,安排 
    * 
    * @param command
    * @return
    */
    @Override
    public void execute(Runnable command) {
        if (command == null) {
            throw new NullPointerException();
        }
        int count = workerCount.get();
        // 若是小於核心線程數,能夠創建新線程
        if (count < corePoolSize) {
            if (addWorker(command, true)) {
                return;
            }
        }
        if (workQueue.offer(command)) {
            int recheck = workerCount.get();
            if (recheck == 0) {
                addWorker(null, false);
            }
            return;
        }
        //拒絕
        throw new RejectedExecutionException("Task " +                   command.toString() +
                " rejected from " +
                this.toString());
    }
    
    private boolean addWorker(Runnable command, boolean core) {
        for (; ; ) {
            int count = workerCount.get();
            if (count >= corePoolSize) {
                return false;
            }
            // 線程數+1, 成功向下走
            if (workerCount.compareAndSet(count, count + 1)) {
                break;
            }
        }
        Worker worker = new Worker(command);
        final Thread thread = worker.thread;
        workers.add(worker);
        thread.start();
        return true; 
    }
    
    
    // ----------------
    
    /**
     * 線程(理髮師) 
     */ 
    private class Worker implements Runnable {
    
        /**
         * 工做者運行的線程 
         */ 
        final Thread thread;
        /**
         * 初始運行的任務,可能爲 null 
         */ 
        Runnable firstTask;
        /**
         * 建立一個工做者 
         * 
         * @param firstTask 第一個任務,能夠爲 null
         */ 
        Worker(Runnable firstTask) {
            this.firstTask = firstTask;
            this.thread = new Thread(this);
        }
    
        @Override
        public void run() {
            System.out.println("run");
            runWorker(this);
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;
            Worker worker = (Worker) o;
            return Objects.equals(thread, worker.thread) &&
                        Objects.equals(firstTask, worker.firstTask);
        }

        @Override
        public int hashCode() {
            return Objects.hash(thread, firstTask);
        }
    }
    
    
    final void runWorker(Worker worker) {
        Runnable task = worker.firstTask;
        worker.firstTask = null;
        while (task != null || (task = getTask()) != null) {
            try {
                task.run();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                // 運行完畢
                task = null;
            }
        }
    }
    
    
    private Runnable getTask() {
        for (; ; ) {
            try {
                //阻塞拿任務
                return workQueue.take();
            } catch (InterruptedException e) {
                System.out.println("InterruptedException!!!");
            }
        }
    }
    
}

測試代碼:this

package com.wandou.demo.thread.post.threapool;

import org.junit.Test;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author liming
 * @date 2020/9/17
 * @description
 */

public class MyThreadPoolDemo {
    private BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
    private MyThreadPool myThreadPool = new MyThreadPool(3, workQueue);

    @Test
    public void t1() throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        Runnable task = new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println(atomicInteger.incrementAndGet()
                            + "號顧客來理髮,爲其理髮的理髮師是:"
                            + Thread.currentThread().getName());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };
        
        for (int i = 0; i < 30; i++) {
            myThreadPool.execute(task);
        }
        Thread.sleep(5000);
        System.out.println("================================================");
        
        for (int i = 0; i < 30; i++) {
            myThreadPool.execute(task);
        }
        //讓主線程阻塞等待
        System.in.read();
    }
    
}

測試結果:atom

image

相關文章
相關標籤/搜索