多線程之線程池-當任務漸增時的處理-各個參數的含義- 阿里,美團,京東面試題目

阿里的面試官問了個問題,若是corePolllSize=10,MaxPollSize=20,若是來了25個線程 怎麼辦,html

 先 達到 corePoolSize,而後 優先放入隊列,而後在到MaxPollSize;而後拒絕;java

答案:面試

當一個任務經過execute(Runnable)方法欲添加到線程池時: 一、 若是此時線程池中的數量小於corePoolSize,即便線程池中的線程都處於空閒狀態,也要建立新的線程來處理被添加的任務。 二、 若是此時線程池中的數量等於 corePoolSize,可是緩衝隊列 workQueue未滿,那麼任務被放入緩衝隊列。 三、若是此時線程池中的數量大於corePoolSize,緩衝隊列workQueue滿,再有新的線程,開始增長線程池的線程數量處理新的線程,直到maximumPoolSize; 四、 若是此時線程池中的數量大於corePoolSize,緩衝隊列workQueue滿,而且線程池中的數量等於maximumPoolSize,那麼經過 handler所指定的策略來處理此任務。也就是:處理任務的優先級爲:核心線程corePoolSize、任務隊列workQueue、最大線程 maximumPoolSize,若是三者都滿了,使用handler處理被拒絕的任務。 五、 當線程池中的線程數量大於 corePoolSize時,若是某線程空閒時間超過keepAliveTime,線程將被終止。這樣,線程池能夠動態的調整池中的線程數。

當線程數小於corePoolSize時,提交一個任務建立一個線程(即便這時有空閒線程)來執行該任務。
當線程數大於等於corePoolSize,首選將任務添加等待隊列workQueue中(這裏的workQueue是上面的BlockingQueue),等有空閒線程時,讓空閒線程從隊列中取任務。
當等待隊列滿時,若是線程數量小於maximumPoolSize則建立新的線程,不然使用拒絕線程處理器來處理提交的任務。編程

 

 

慢慢的啓動到10,而後把剩下的15個放到阻塞隊列裏面,並開始在線程池裏面建立線程,直到最大MaximumPoolSize;緩存

固然是先放在阻塞隊列(若是數量爲0,就一直等待,LinkedBlockingDeque是一個由鏈表結構組成的雙向阻塞隊列,兩邊均可以進出的,那種,多線程

參考:聊聊併發(七)——Java中的阻塞隊列)裏面了,BlockingQueue,面試官想知道具體的處理流程,我掌握的不深,因而下定決心好好查查:併發

尤爲是那個車間裏工人的例子,好好看看,理解線程頗有用:ide

 

在上一章中咱們概述了一下線程池,這一章咱們看一下建立newFixedThreadPool的源碼。例子仍是咱們在上一章中寫的那個例子。函數

建立newFixedThreadPool的方法:

 

public static ExecutorService newFixedThreadPool(int nThreads) {  
    return new ThreadPoolExecutor(nThreads, nThreads,  
                                  0L, TimeUnit.MILLISECONDS,  
                                  new LinkedBlockingQueue<Runnable>());  
}  

 

   

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {  
    return new ThreadPoolExecutor(nThreads, nThreads,  
                                  0L, TimeUnit.MILLISECONDS,  
                                  new LinkedBlockingQueue<Runnable>(),  
                                  threadFactory);  
}  

 

上面這兩個方法是建立固定數量的線程池的兩種方法,二者的區別是:第二種建立方法多了一個線程工廠的方法。咱們繼續看ThreadPoolExecutor這個類中的構造函數:post

 

ThreadPoolExecutor的構造函數:

  

public ThreadPoolExecutor(int corePoolSize,  
                          int maximumPoolSize,  
                          long keepAliveTime,  
                          TimeUnit unit,  
                          BlockingQueue<Runnable> workQueue) {  
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,  
         Executors.defaultThreadFactory(), defaultHandler);  
}  

public ThreadPoolExecutor(int corePoolSize,  
                          int maximumPoolSize,  
                          long keepAliveTime,  
                          TimeUnit unit,  
                          BlockingQueue<Runnable> workQueue,  
                          ThreadFactory threadFactory,  
                          RejectedExecutionHandler handler) {  
    if (corePoolSize < 0 ||  
        maximumPoolSize <= 0 ||  
        maximumPoolSize < corePoolSize ||  
        keepAliveTime < 0)  
        throw new IllegalArgumentException();  
    if (workQueue == null || threadFactory == null || handler == null)  
        throw new NullPointerException();  
    this.corePoolSize = corePoolSize;  
    this.maximumPoolSize = maximumPoolSize;  
    this.workQueue = workQueue;  
    this.keepAliveTime = unit.toNanos(keepAliveTime);  
    this.threadFactory = threadFactory;  
    this.handler = handler;  
}  

 

ThreadPollExecutor中的全部的構造函數最終都會調用上面這個構造函數,接下來咱們來分析一下這些參數的含義: 

corePoolSize:

線程池啓動後,在池中保持的線程的最小數量。須要說明的是線程數量是逐步到達corePoolSize值的。例如corePoolSize被設置爲10,而任務數量只有5,則線程池中最多會啓動5個線程,而不是一次性地啓動10個線程。

maxinumPoolSize:

線程池中能容納的最大線程數量,若是超出,則使用RejectedExecutionHandler拒絕策略處理。 

keepAliveTime:

線程的最大生命週期。這裏的生命週期有兩個約束條件:一:該參數針對的是超過corePoolSize數量的線程;二:處於非運行狀態的線程。舉個例子:若是corePoolSize(最小線程數)爲10,maxinumPoolSize(最大線程數)爲20,而此時線程池中有15個線程在運行,過了一段時間後,其中有3個線程處於等待狀態的時間超過keepAliveTime指定的時間,則結束這3個線程,此時線程池中則還有12個線程正在運行。

unit:

這是keepAliveTime的時間單位,能夠是納秒,毫秒,秒,分鐘等。

workQueue: 

任務隊列。當線程池中的線程都處於運行狀態,而此時任務數量繼續增長,則須要一個容器來容納這些任務,這就是任務隊列。這個任務隊列是一個阻塞式的單端隊列。 

newFixedThreadPoolnewSingleThreadExector使用的是LinkedBlockingQueue的無界模式(美團面試題目)。

 

threadFactory:

定義如何啓動一個線程,能夠設置線程的名稱,而且能夠肯定是不是後臺線程等。

handler:

拒絕任務處理器。因爲超出線程數量和隊列容量而對繼續增長的任務進行處理的程序。
OK,ThreadPoolExecutor中的主要參數介紹完了。咱們再說一下線程的管理過程: 首先建立一個線程池,而後根據任務的數量逐步將線程增大到corePoolSize,若是此時仍有任務增長,則放置到workQueue中,直到workQueue爆滿爲止,而後繼續增長池中的線程數量(加強處理能力),最終達到maxinumPoolSize。那若是此時還有任務要增長進來呢?這就須要handler來處理了, 或者丟棄新任務,或者拒絕新任務,或者擠佔已有的任務(拒絕策略,美團面試)。在任務隊列和線程池都飽和的狀況下,一旦有線程處於等待(任務處理完畢,沒有新任務)狀態的時間超過keepAliveTime,則該線程終止,也就是說池中的線程數量會逐漸下降,直至爲corePoolSize數量爲止。
總結:
ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue
RejectedExecutionHandler handler)
 
corePoolSize: 線程池維護線程的最少線程數,也是核心線程數,包括空閒線程
maximumPoolSize: 線程池維護線程的最大線程數
keepAliveTime: 線程池維護線程所容許的空閒時間
unit: 程池維護線程所容許的空閒時間的單位
workQueue: 線程池所使用的緩衝隊列
handler: 線程池對拒絕任務的處理策略

 

 
 
 
在《編寫高質量代碼 改善Java程序的151個建議》這本書裏舉的這個例子很形象:
 
OK,接下來咱們來看一下怎麼往任務隊裏中放入線程任務:在java.util.concurrent.AbstractExecutorService這個類的submit方法

submit方法

public Future<?> submit(Runnable task) {  
    if (task == null) throw new NullPointerException();  
    RunnableFuture<Void> ftask = newTaskFor(task, null);  
    execute(ftask);//執行任務  
    return ftask;  
}  
  
/** 
 * @throws RejectedExecutionException {@inheritDoc} 
 * @throws NullPointerException       {@inheritDoc} 
 */  
public <T> Future<T> submit(Runnable task, T result) {  
    if (task == null) throw new NullPointerException();  
    RunnableFuture<T> ftask = newTaskFor(task, result);  
    execute(ftask);//執行任務  
    return ftask;  
}  
  
/** 
 * @throws RejectedExecutionException {@inheritDoc} 
 * @throws NullPointerException       {@inheritDoc} 
 */  
public <T> Future<T> submit(Callable<T> task) {  
    if (task == null) throw new NullPointerException();  
    RunnableFuture<T> ftask = newTaskFor(task);  
    execute(ftask);//執行任務  
    return ftask;  
}  

 

這是三個重載方法,分別對應Runnable、帶結果的Runnable接口和Callable回調函數。其中的newTaskFor也是一個重載的方法,它經過層層的包裝,把Runnable接口包裝成了適配RunnableFuture的實現類,底層實現以下:

public FutureTask(Runnable runnable, V result) {  
    this.callable = Executors.callable(runnable, result);  
    this.state = NEW;       // ensure visibility of callable  
}  

 

 
public static <T> Callable<T> callable(Runnable task, T result) {  
    if (task == null)  
        throw new NullPointerException();  
    return new RunnableAdapter<T>(task, result);  
}  

 

static final class RunnableAdapter<T> implements Callable<T> {  
    final Runnable task;  
    final T result;  
    RunnableAdapter(Runnable task, T result) {  
        this.task = task;  
        this.result = result;  
    }  
    public T call() {  
        task.run();  
        return result;  
    }  
}  

 

在submit中最重要的是execute這個方法,這個方法也是咱們分析的重點

execute方法:

 
public void execute(Runnable command) {  
    if (command == null)  
        throw new NullPointerException();  
    int c = ctl.get();  
    if (workerCountOf(c) < corePoolSize) {//  
        if (addWorker(command, true))  
            return;  
        c = ctl.get();  
    }  
    if (isRunning(c) && workQueue.offer(command)) {  
        int recheck = ctl.get();  
        if (! isRunning(recheck) && remove(command))  
            reject(command);  
        else if (workerCountOf(recheck) == 0)  
            addWorker(null, false);  
    }  
    else if (!addWorker(command, false))  
        reject(command);  
}  

 

在這個方法中分爲三部分
一、若是少於corePoolSize數量的線程在運行,則啓動一個新的線程並把傳進來的Runnable作爲第一個任務。而後會檢查線程的運行狀態和worker的數量,阻止不符合要求的任務添加到線程中
二、若是一個任務成功的放入到了隊列中,咱們仍然須要二次檢查咱們是否應該添加線程或者中止。所以咱們從新檢查線程狀態,是否須要回滾隊列,或者是中止或者是啓動一個新的線程
三、若是咱們不能添加隊列任務了,可是仍然在往隊列中添加任務,若是添加失敗的話,用拒絕策略來處理。
這裏最主要的是addWorker這個方法:
try {  
    w = new Worker(firstTask);  
    final Thread t = w.thread;  
    if (t != null) {  
        final ReentrantLock mainLock = this.mainLock;  
        mainLock.lock();  
        try {  
            // Recheck while holding lock.  
            // Back out on ThreadFactory failure or if  
            // shut down before lock acquired.  
            int rs = runStateOf(ctl.get());  
  
            if (rs < SHUTDOWN ||  
                (rs == SHUTDOWN && firstTask == null)) {  
                if (t.isAlive()) // precheck that t is startable  
                    throw new IllegalThreadStateException();  
                workers.add(w);  
                int s = workers.size();  
                if (s > largestPoolSize)  
                    largestPoolSize = s;  
                workerAdded = true;  
            }  
        } finally {  
            mainLock.unlock();  
        }  
        if (workerAdded) {  
            t.start();  
            workerStarted = true;  
        }  
    }  
} finally {  
    if (! workerStarted)  
        addWorkerFailed(w);  
}  

 

咱們在這個方法裏建立一個線程,注意這個線程不是咱們的任務線程,而是通過包裝的Worker線程。因此這裏的run方法是Worker這個類中的run方法。execute方法是經過Worker類啓動的一個工做線程,執行的是咱們的第一個任務,而後該線程經過getTask方法從任務隊列總獲取任務,以後再繼續執行。這個任務隊列是一個BlockingQueue,是一個阻塞式的,也就是說若是該隊列元素爲0,則保持等待狀態。直到有任務進入爲止。
 

Java中的線程池

咱們通常將任務(Task)提交到線程池中運行,對於一個線程池而言,須要關注的內容有如下幾點:
在什麼樣的線程中執行任務
任務按照什麼順序來執行(FIFO,LIFO,優先級)
最多有多少個任務能併發執行
最多有多個任務等待執行
若是系統過載則須要拒絕一個任務,如何通知任務被拒絕?
在執行一個任務以前或以後須要進行哪些操做
圍繞上面的問題,咱們來研究一下java中的線程池

線程池的建立

Exectors.newFixedThreadPool(int size):建立一個固定大小的線程池。 每來一個任務建立一個線程,當線程數量爲size將會中止建立。當線程池中的線程已滿,繼續提交任務,若是有空閒線程那麼空閒線程去執行任務,不然將任務添加到一個無界的等待隊列中。
Exectors.newCachedThreadPool():建立一個可緩存的線程池。對線程池的規模沒有限制,當線程池的當前規模超過處理需求時(好比線程池中有10個線程,而須要處理的任務只有5個),那麼將回收空閒線程。當需求增長時則會添加新的線程。
Exectors.newSingleThreadExcutor():建立一個單線程的Executor,它建立單個工做者線程來執行任務,若是這個線程異常結束,它會建立另外一個線程來代替。
Exectors.newScheduledThreadPool():建立一個固定長度的線程池,並且以延遲或定時的方式來執行任務。
上面都是經過工廠方法來建立線程池,其實它們內部都是經過建立ThreadPoolExector對象來建立線程池的。下面是ThreadPoolExctor的構造函數。

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    ...
}

 

咱們看到構造函數是public類型的,因此咱們也能夠自定義本身的線程池。

在什麼樣的線程中執行任務?

java中對於任務的描述有兩種,一種是Runnable型的任務,一種是Callable型的任務。前者運行結束後不會返回任何東西,然後者能夠返回咱們須要的計算結果,甚至異常。

在沒有返回值的線程中運行

建立一個線程池,而後調用其execute方法,並將一個Runnable對象傳遞進去便可。

ExectorService exector = Exectors.newCachedThreadPool();
exector.execute(new Runnable(){
public void run(){
System.out.println("running...");
}
});

 

在有返回值的線程中運行
ExectorService exector = Exectors.newCachedThreadPool();
Callable<Result> task = new Callable<Result>() {
    public Result call() {
        return new Computor().compute();
    }
};
Future<Result> future = exector.submit(task);
result = future.get();  //改方法會一直阻塞,直到提交的任務被運行完畢

 

任務按照什麼順序來執行(FIFO,優先級)

若是任務按照某種順序來執行的話,則任務必定是串行執行的。咱們能夠看到在ThreadPoolExecutor中第四個參數是BlockingQueue,提交的任務都先放到該隊列中。若是傳入不一樣的BlockQueue就能夠實現不一樣的執行順序。傳入LinkedBlockingQueue則表示先來先服務,傳入PriorityBlockingQueue則使用優先級來處理任務

Exectors.newSingleThreadExcutor()使用的是先來先服務策略

最多有多少個任務能併發執行

線程池中的線程會不斷從workQueue中取任務來執行,若是沒任務可執行,則線程處於空閒狀態。
在ThreadPoolExecutor中有兩個參數corePoolSize和maximumPoolSize,前者被稱爲基本大小,表示一個線程池初始化時,裏面應該有的必定數量的線程。可是默認狀況下,ThreadPoolExecutor在初始化是並不會立刻建立corePoolSize個線程對象,它使用的是懶加載模式。

  • 當線程數小於corePoolSize時,提交一個任務建立一個線程(即便這時有空閒線程)來執行該任務。
  • 當線程數大於等於corePoolSize,首選將任務添加等待隊列workQueue中(這裏的workQueue是上面的BlockingQueue),等有空閒線程時,讓空閒線程從隊列中取任務。
  • 當等待隊列滿時,若是線程數量小於maximumPoolSize則建立新的線程,不然使用拒絕線程處理器來處理提交的任務。

最多有多少的任務等待執行

這個問題和BlockingQueue相關。 BlockingQueue有三個子類,一個是ArrayBlockingQueue(有界隊列),一個是LinkedBlockingQueue(默認無界,但能夠配置爲有界),PriorityBlockingQueue(默認無界,可配置爲有界)。因此,對於有多少個任務等待執行與傳入的阻塞隊列有關。

newFixedThreadPoolnewSingleThreadExector使用的是LinkedBlockingQueue的無界模式。而newCachedThreadPool使用的是SynchronousQueue,這種狀況下線程是不須要排隊等待的,SynchronousQueue適用於線程池規模無界。

若是系統過載則須要拒絕一個任務,如何通知任務被拒絕?

當有界隊列被填滿或者某個任務被提交到一個已關閉的Executor時將會啓動飽和策略,即便用RejectedExecutionHandler來處理。JDK中提供了幾種不一樣的RejectedExecutionHandler的實現:AbortPolicy,CallerRunsPolicy, DiscardPolicy和DiscardOldestPolicy。

AbortPolicy:默認的飽和策略。該策略將拋出未檢查的RejectedExcutionException,調用者能夠捕獲這個異常,而後根據本身的需求來處理。

DiscardPolicy:該策略將會拋棄提交的任務

DiscardOldestPolicy:該策略將會拋棄下一個將被執行的任務(處於隊頭的任務),而後嘗試從新提交該任務到等待隊列

CallerRunsPolicy:該策略既不會拋棄任務也不會拋出異常,而是在調用execute()的線程中運行任務。好比咱們在主線程中調用了execute(task)方法,可是這時workQueue已經滿了,而且也不會建立的新的線程了。這時候將會在主線程中直接運行execute中的task。

在執行一個任務以前或以後須要進行哪些操做

ThreadPoolExecutor是可擴展的,它提供了幾個能夠重載的方法:beforeExecute,afterExecuteterminated,這裏用到了面向的切面編程的思想。不管任務是從run中正常返回,仍是拋出異常而返回,afterExectue都會被調用。若是 beforeExecute中拋出了一個 RunntimeException,那麼任務將不會被執行,而且 afterExecute也不會被調用。

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class Test {

    public static void main(String[] args) {
        TimingThreadPool executor = new TimingThreadPool(5, 10, 1,
                TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>());
        for (int i = 0; i < 5; i++)
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println("running1....");
                }
            });
        executor.shutdown();
    }
}

class TimingThreadPool extends ThreadPoolExecutor {

    private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();
    private final AtomicLong numTasks = new AtomicLong();
    private final AtomicLong totalTime = new AtomicLong();

    public TimingThreadPool(int corePoolSize, int maximumPoolSize,
            long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        startTime.set(System.nanoTime());
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        try {
            long endTime = System.nanoTime();
            long taskTime = endTime - startTime.get();
            numTasks.incrementAndGet();
            totalTime.addAndGet(taskTime);
        } finally {
            super.afterExecute(r, t);
        }
    }

    @Override
    protected void terminated() {
        try {
            System.out.println(String.format("Terminated: arg time = %d",
                    totalTime.get() / numTasks.get()));
        } finally {
            super.terminated();
        }
    }
}

 

上面的代碼統計任務平均執行時間,在每一個線程中beforeExecute和afertExecute都會執行一次,而terminated等線程池關閉的時候執行

參考:Java多線程和線程池

參考:java中的線程池 

相關文章
相關標籤/搜索