Java入門系列之線程池ThreadPoolExecutor原理分析思考(十五)

前言

關於線程池原理分析請參看《http://objcoding.com/2019/04/25/threadpool-running/》,建議對原理不太瞭解的童鞋先看下此文而後再來看本文,這裏經過對原理的學習我談談對線程池的理解,如有錯誤之處,還望批評指正。服務器

線程池思考

線程池咱們可認爲是準備好執行應用程序級任務的預先實例化的備用線程集合,線程池經過同時運行多個任務來提升性能,同時防止線程建立過程當中的時間和內存開銷,例如,一個Web服務器在啓動時實例化線程池,這樣當客戶端請求進入時,它就不會花時間建立線程,與爲每一個任務都建立線程相比,線程池經過避免一次無限建立線程來避免資源(處理器,內核,內存等)用盡,建立必定數量的線程後,一般將多餘的任務放在等待隊列中,直到有線程可用於新任務。下面咱們經過一個簡單的例子來歸納線程池原理,以下:app

    public static void main(String[] args) {

        ArrayBlockingQueue<Runnable> arrayBlockingQueue = new ArrayBlockingQueue<>(5);

        ThreadPoolExecutor poolExecutor =
                new ThreadPoolExecutor(2,
                        5, Long.MAX_VALUE, TimeUnit.NANOSECONDS, arrayBlockingQueue);

        for (int i = 0; i < 11; i++) {
            try {
                poolExecutor.execute(new Task());
            } catch (RejectedExecutionException ex) {
                System.out.println("拒絕任務 = " + (i + 1));
            }
            printStatus(i + 1, poolExecutor);
        }
    }

    static void printStatus(int taskSubmitted, ThreadPoolExecutor e) {
        StringBuilder s = new StringBuilder();
        s.append("工做池大小 = ")
                .append(e.getPoolSize())
                .append(", 核心池大小 = ")
                .append(e.getCorePoolSize())
                .append(", 隊列大小 = ")
                .append(e.getQueue().size())
                .append(", 隊列剩餘容量 = ")
                .append(e.getQueue().remainingCapacity())
                .append(", 最大池大小 = ")
                .append(e.getMaximumPoolSize())
                .append(", 提交任務數 = ")
                .append(taskSubmitted);

        System.out.println(s.toString());
    }

    static class Task implements Runnable {

        @Override
        public void run() {
            while (true) {
                try {
                    Thread.sleep(1000000);
                } catch (InterruptedException e) {
                    break;
                }
            }
        }
    }

如上例子很好的闡述了線程池基本原理,咱們聲明一個有界隊列(容量爲5),實例化線程池的核心池大小爲2,最大池大小爲10,建立線程沒有自定義實現,默認經過線程池工廠建立,拒絕策略爲默認,提交11個任務。在啓動線程池時,默認狀況下它將以無線程啓動,當咱們提交第一個任務時,將產生第一個工做線程,並將任務移交給該線程,只要當前工做線程數小於配置的核心池大小,即便某些先前建立的核心線程可能處於空閒狀態,也會爲每一個新提交的任務生成一個新的工做線程(注意:當工做線程池大小未超過核心池大小時以建立的Worker中的第一個任務執行即firstTask,而繞過了阻塞隊列),若超過核心池大小會將任務放入阻塞隊列,一旦阻塞隊列滿後將從新建立線程任務,若任務超過最大線程池大小將執行拒絕策略。當阻塞隊列爲無界隊列(如LinkedBlockingQueue),很顯然設置的最大池大小將無效。咱們再來闡述下,當工做線程數達到核心池大小時,若此時提交的任務愈來愈多,線程池的具體表現行爲是什麼呢?ide

一、只要有任何空閒的核心線程(先前建立的工做線程,但已經完成分配的任務),它們將接管提交的新任務並執行。源碼分析

二、若是沒有可用的空閒核心線程,則每一個提交的新任務都將進入已定義的工做隊列中,直到有一個核心線程能夠處理它爲止。若是工做隊列已滿,但仍然沒有足夠的空閒核心線程來處理任務,那麼線程池將恢復而建立新的工做線程,新任務將由它們來執行。 一旦工做線程數達到最大池大小,線程池將再次中止建立新的工做線程,而且在此以後提交的全部任務都將被拒絕。性能

由上述2咱們知道,一旦達到核心線程大小就會進入阻塞隊列(阻塞隊列未滿),咱們可認爲這是一種執行阻塞隊列優先的機制,那咱們是否是能夠思考一個問題:何不建立非核心線程來擴展線程池大小而不是進入阻塞隊列,當達到最大池大小時才進入阻塞隊列進行排隊,這種方式和默認實現方式在效率和性能上是否是可能會更好呢? 可是從另一個層面來說,既然不想很快進入阻塞隊列,那麼何不將指定的核心池大小進行擴展大一些呢?咱們知道線程數越多那麼將致使明顯的數據爭用問題,也就是說在非峯值系統中的線程數會不少,因此在峯值系統中經過建立非核心線程理論上是否是可以比默認當即進入阻塞隊列具備支撐規模化的任務更加具備性能上的優點呢?那麼咱們怎樣才能修改默認操做呢?咱們首先來看看在執行任務時的操做學習

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();

    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
    }
}

第一步獲得當前工做線程數若小於核心池大小,那麼將建立基於核心池的線程而後執行任務,這一點咱們沒毛病,第二步若工做線程大小超過核心池大小,若當前線程正處於運行狀態且將其任務放到阻塞隊列中,若失敗進行第三步建立非核心池線程,經過源碼分析得知,若核心池中線程即便有空閒線程也會建立線程執行任務,那麼咱們是否是能夠獲得核心池中是否有空閒的線程呢,如有而後才嘗試使其進入阻塞隊列,因此咱們須要重寫阻塞隊列中的offer方法,添加一個是否有空閒核心池的線程,讓其接待任務。因此咱們繼承上述有界阻塞隊列,以下:ui

public class CustomArrayBlockingQueue<E> extends ArrayBlockingQueue {

    private final AtomicInteger idleThreadCount = new AtomicInteger();

    public CustomArrayBlockingQueue(int capacity) {
        super(capacity);
    }

    @Override
    public boolean offer(Object o) {
        return idleThreadCount.get() > 0 && super.offer(o);
    }
}

可是不幸的是,經過對線程池源碼的分析,咱們並不可以獲得空閒的核心池的線程,可是咱們能夠跟蹤核心池中的空閒線程,在獲取任務方法中以下:spa

boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

if ((wc > maximumPoolSize || (timed && timedOut))
    && (wc > 1 || workQueue.isEmpty())) {
    if (compareAndDecrementWorkerCount(c))
        return null;
    continue;
}

try {
    Runnable r = timed ?
        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
        workQueue.take();
    if (r != null)
        return r;
    timedOut = true;
} catch (InterruptedException retry) {
    timedOut = false;
}

如上截取獲取任務的核心,若工做線程大小大於核心池大小時,默認狀況下會進入阻塞隊列此時經過pool獲取阻塞隊列中的任務,若工做線程大小小於核心池大小時,此時會調用take方法獲從阻塞隊列中獲取可用的任務,此時說明當前核心池線程處於空閒狀態,若是隊列中沒有任務,則線程將在此調用時會阻塞,直到有可用的任務爲止,所以核心池線程仍然處於空閒狀態,因此咱們增長上述計數器,不然,調用方法返回,此時該線程再也不處於空閒狀態,咱們能夠減小計數器,重寫take方法,以下:線程

@Override
public Object take() throws InterruptedException {
    idleThreadCount.incrementAndGet();
    Object take = super.take();
    idleThreadCount.decrementAndGet();
    return take;
}

接下來咱們再來考慮timed爲true的狀況,在這種狀況下,線程將使用poll方法,很顯然,進入poll方法的任何線程當前都處於空閒狀態,所以咱們能夠在工做隊列中重寫此方法的實現,以在開始時增長計數器,而後,咱們能夠調用實際的poll方法,這可能致使如下兩種狀況之,若是隊列中沒有任務,則線程將等待此調用以提供所提供的超時,而後返回null。到此時,線程將超時,並將很快從池中退出,從而將空閒線程數減小1,所以咱們能夠在此時減小計數器,不然由方法調用返回,所以該線程再也不處於空閒狀態,此時咱們也能夠減小計數器。code

@Override
public Object poll(long timeout, TimeUnit unit) throws InterruptedException {
    idleThreadCount.incrementAndGet();
    Object poll = super.poll(timeout, unit);
    idleThreadCount.decrementAndGet();
    return poll;
}

經過上述咱們對offer、pool、take方法的重寫,使得在沒有基於核心池的空閒線程進行擴展非核心線程,還未結束,若達到了最大池大小,此時咱們須要將其添加到阻塞隊列中排隊,因此最終使用咱們自定義的阻塞隊列,並使用自定義的拒絕策略,以下:

CustomArrayBlockingQueue<Runnable> arrayBlockingQueue = new CustomArrayBlockingQueue<>(5);

ThreadPoolExecutor poolExecutor =
        new ThreadPoolExecutor(10,
                100, Long.MAX_VALUE, TimeUnit.NANOSECONDS, arrayBlockingQueue
                , Executors.defaultThreadFactory(), (r, executor) -> {
            if (!executor.getQueue().add(r)) {
                System.out.println("拒絕任務");
            }
        });

for (int i = 0; i < 150; i++) {
    try {
        poolExecutor.execute(new Task());
    } catch (RejectedExecutionException ex) {
        System.out.println("拒絕任務 = " + (i + 1));
    }
    printStatus(i + 1, poolExecutor);
}

上述咱們實現自定義的拒絕策略,將拒絕的任務放入到阻塞隊列中,若阻塞隊列已滿而不能再接收新的任務,咱們將調用默認的拒絕策略或者是其餘處理程序,因此在將任務添加到阻塞隊列中即調用add方法時,咱們還須要重寫add方法,以下:

@Override
public boolean add(Object o) {
    return super.offer(o);
}

總結

以上詳細內容只是針對線程池的默認實現而引起的思考,經過如上方式是否可以對於規模化的任務處理起來在性能上有必定改善呢?可能也有思慮不周全的地方,暫且分析於此。

相關文章
相關標籤/搜索