Java多線程ThreadPoolExecutor初探

我的博客項目地址java

但願各位幫忙點個star,給我加個小星星✨git


在java中,使用線程時經過new Thread實現很簡單,可是若是併發數量不少時,頻繁地建立線程就會大大下降系統的效率。github

因此能夠經過線程池,使得線程能夠複用,每執行完一個任務,並非被銷燬,而是能夠繼續執行其餘任務。編程

花了兩天時間去看了高洪巖寫的《JAVA併發編程》,是想要知其然,知其因此然,在使用的狀況下,瞭解學習了一下原理記錄下java.util.concurrent併發包下的ThreadPoolExecutor特性和實現緩存


使用示例

粗暴點,咱們直接看如何使用吧多線程

(一)使用Executors

簡單舉個🌰:
Executors.newCachedThreadPool();        //建立一個緩衝池,緩衝池容量大小爲Integer.MAX_VALUE
Executors.newSingleThreadExecutor();   //建立容量爲1的緩衝池
Executors.newFixedThreadPool(int);    //建立固定容量大小的緩衝池
具體實現邏輯:
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    
	public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

	public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    
複製代碼

經過該Executors的靜態方法進行線程池的建立,並且從具體實現來看,仍是調用了new ThreadPoolExecutor(),只是內部參數已經幫咱們配置好了。併發

(二) 使用ThreadPoolExecutor

既然真正實現都是用ThreadPoolExecutor,那就本身設定好方法的參數吧。ide

public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 5, TimeUnit.HOURS, new LinkedBlockingDeque<>());

        for(int i=0;i<10;i++){
            MyTask myTask = new MyTask(i);
            executor.execute(myTask);
            System.out.println("線程池中線程數目:"+executor.getPoolSize()+",隊列中等待執行的任務數目:"+
                    executor.getQueue().size()+",已執行完別的任務數目:"+executor.getCompletedTaskCount());
        }
        executor.shutdown();

    }

    static class MyTask implements Runnable {
        private int taskNum;

        public MyTask(int num) {
            this.taskNum = num;
        }

        @Override
        public void run() {
            System.out.println("正在執行task "+taskNum);
            try {
                Thread.sleep(4000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("task "+taskNum+"執行完畢");
        }
    }
複製代碼

打印效果以下:函數

正在執行task 0
線程池中線程數目:1,隊列中等待執行的任務數目:0,已執行完別的任務數目:0
線程池中線程數目:2,隊列中等待執行的任務數目:0,已執行完別的任務數目:0
正在執行task 1
線程池中線程數目:3,隊列中等待執行的任務數目:0,已執行完別的任務數目:0
正在執行task 2
線程池中線程數目:4,隊列中等待執行的任務數目:0,已執行完別的任務數目:0
正在執行task 3
線程池中線程數目:5,隊列中等待執行的任務數目:0,已執行完別的任務數目:0
正在執行task 4
線程池中線程數目:5,隊列中等待執行的任務數目:1,已執行完別的任務數目:0
線程池中線程數目:5,隊列中等待執行的任務數目:2,已執行完別的任務數目:0
線程池中線程數目:5,隊列中等待執行的任務數目:3,已執行完別的任務數目:0
線程池中線程數目:5,隊列中等待執行的任務數目:4,已執行完別的任務數目:0
線程池中線程數目:5,隊列中等待執行的任務數目:5,已執行完別的任務數目:0
task 2執行完畢
task 0執行完畢
task 3執行完畢
task 1執行完畢
正在執行task 8
task 4執行完畢
正在執行task 7
正在執行task 6
正在執行task 5
正在執行task 9
task 8執行完畢
task 6執行完畢
task 7執行完畢
task 5執行完畢
task 9執行完畢
複製代碼

任務Task提交以後,因爲是多線程狀態下,因此打印效果並非同步的,能夠看出任務都已經順利執行。學習

我這個實現參數是5個corePoolSize核心線程數和5個maximumPoolSize最大線程數,當線程池中的線程數超過5個的時候,將新來的任務放進緩存隊列中,小夥伴能夠試下把任務數(for循環的個數)提升一點,讓緩存等待的任務數超過5個,看看默認的任務拒絕策略(AbortPolicy)會拋出什麼錯誤hhh

下面來看看ThreadPoolExecutor的廬山真面目吧~


ThreadPoolExecutor

它有如下四個構造方法:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
複製代碼

從構造方法能夠看出,前三個方法最終都是調用第四個構造器進行初始化工做的。

參數解釋:

  • corePoolSize:池中保持的線程數,包括空閒的線程,也就是核心池的大小
  • maximumPoolSize:池中鎖容許最大線程數
  • keepAliveTime:當線程數量超過corePoolSize,在沒有超過指定的時間內不從線程池中刪除,若是超過該時間,則刪除
  • unit:keepAliveTime的時間單位
  • workQueue:執行前用來保存任務的隊列,此隊列只保存由execute方法提交的Runnable任務

workQueue(任務隊列,是一個阻塞隊列)

ArrayBlockingQueue:
public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

複製代碼
LinkedBlockingDeque:(支持列頭和列尾操做,pollFirst/pollLast)
public LinkedBlockingDeque() {
        this(Integer.MAX_VALUE);
    }


    public LinkedBlockingDeque(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
    }

複製代碼

從源碼構造函數能夠看到,不傳參數的時候,默認阻塞隊列中的大小是Integer.MAX_VALUE;

SynchronousQueue:
public SynchronousQueue() {
        this(false);
    }

    public SynchronousQueue(boolean fair) {
        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    }
複製代碼

Array和Linked在傳入大小小於0時將會報錯,比較經常使用的是LinkedBlockingDeque和SynchronousQueue,線程池的排隊策略與BlockingQueue有關

ThreadFactory:線程工廠

主要用來建立線程,能夠在newThread()方法中自定義線程名字和設置線程異常狀況的處理邏輯。

舉個🌰:

static class MyThreadFactory implements ThreadFactory {
        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread();
            thread.setName("JingQ" + new Date());
            thread.setUncaughtExceptionHandler((t, e) -> {
                doSomething();
                e.printStackTrace();
            });
            return thread;
        }
    }
複製代碼

handler:拒絕策略

有如下四種:

  • ThreadPoolExecutor.AbortPolicy:當任務添加到線程中被拒絕時,它會拋出RejectedExecutionException異常。
  • ThreadPoolExecutor.DiscardPolicy:任務被拒絕時,線程池丟棄被拒絕的任務
  • ThreadPoolExecutor.DiscardOldestPolicy:任務被拒絕時,線程池會放棄等待隊列中最舊的未處理文物,而後將被拒絕的任務添加到等待隊列中
  • ThreadPoolExecutor.CallerRunsPolicy:任務被拒絕時,會使用調用線程池的Thread線程對象處理被拒絕的任務

ThreadPoolExecutor繼承結構

能夠看出,實際上ThreadPoolExecutor是繼承了AbstractExecutorService類和引用了ExecutorService、Executor接口。

AbstractExecutorService

public abstract class AbstractExecutorService implements ExecutorService {
 
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { };
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { };
    public Future<?> submit(Runnable task) {};
    public <T> Future<T> submit(Runnable task, T result) { };
    public <T> Future<T> submit(Callable<T> task) { };
    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos) throws InterruptedException, ExecutionException, TimeoutException {
    };
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
    };
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
    };
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {
    };
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                         long timeout, TimeUnit unit)
        throws InterruptedException {
    };
}
複製代碼

AbstarctExecutorService是一個抽象類,它實現的是ExecutorService接口

ExecutorService

public interface ExecutorService extends Executor {
    void shutdown();
    boolean isShutdown();
    boolean isTerminated();
    boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;
 
    <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
複製代碼

接口ExecutorService引用了Executor接口,Executor接口比較簡單,只有一個execute方法定義

Executor

public interface Executor {
    void execute(Runnable command);
}
複製代碼

小結:

Executor是一個頂級接口,定義了一個execute方法,返回值爲空,參數爲Runnable。

ExecutorService繼承了Executor而且定義了其它一些方法,結果以下圖:

抽象類AbstractExecutorService實現了ExecutorService接口,基本實現了ExecutorService中聲明的全部方法。

最後ThreadPoolExecutor繼承了AbstractExecutorService,咱們最經常使用到它兩個方法,submit和execute,下面介紹一下這二者:

execute():
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /* * (如下是我的渣翻譯,有誤請輕噴~) * 有如下三步流程: * * 1. 若是少於核心池大小的線程正在運行, * 那麼嘗試以給定的命令做爲它的第一個任務啓動一個新線程。 * 調用添加worker原子性檢查運行狀態和workder的數量, * 這樣能夠防止錯誤警報在不該該返回的狀況下添加線程,返回false。 * * 2. 若是一個任務能夠成功地排隊,那麼咱們仍然須要再次檢查是否應該添加一個線程 * (由於現有的線程在上次檢查後死亡),或者是在該方法進入後關閉了池。 * 所以,咱們從新檢查狀態,若是必要的話,若是中止的話,須要回滾隊列。 * 若是沒有新的線程,就去啓動它 * * 3. 若是咱們不能排隊任務,那麼咱們嘗試添加一個新線程。 * 若是失敗了,咱們知道任務隊列已經被關閉或飽和,因此拒絕這個任務。 */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }
複製代碼
submit:
	public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

   
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

複製代碼

小結

execute()方法在ThreadPoolExecutor中進行了重寫,submit()方法是在AbstractExecutorService實現的,ThreadPoolExecutor並無重寫,而且execute方法是沒有返回結果的,submit的返回類型是Future,可以得到任務的結果,可是實際執行的仍是execute方法。

固然,還有例如shutdown、getQueue、getActiveCount、getPoolSize等方法沒有介紹到,推薦胖友們打開IDE進行查看吧~

ps:關於線程池的原理並未深刻記錄,有關它的任務拒絕策略、線程初始化、ThreadPoolExecutor構造以後,當任務超過設定值,它的執行策略等原理都值得去深刻學習,下回記錄~

相關文章
相關標籤/搜索