JAVA線程池的原理

 

提交一個線程到線程池:java

1.判斷核心線程是否都在執行任務,若是有空閒的或者沒有被建立的,則建立去執行新的工做線程執行任務,若是核心線程都在執行,則下一步;緩存

2.線程池判斷工做隊列是否已滿,沒有滿,則提交任務到工做等待隊列,若是工做等待隊列已滿,則下一步;bash

3.判斷線程池裏的全部工做線程是否都在工做,若是沒有,則建立一個新的工做線程執行任務,若是已滿,則根據飽和策略處理任務。服務器

飽和執行策略:併發

1.AbortPolicy 拒絕新任務,直接拋出異常異步

2.DiscardPolicy 直接丟棄任務spa

3.DiscardOldestPolicy 丟棄隊列中最老的任務 先將阻塞隊列中的頭元素出隊拋棄,再嘗試提交任務。線程

4.CallerRunsPolicy 將任務丟給調用線程執行rest

 

接口繼承關係code

Java API對ExecutorService接口的實現有兩個,因此這兩個便是Java線程池具體實現類:

1. ThreadPoolExecutor
2. ScheduledThreadPoolExecutor

ExecutorService還繼承了Executor接口(注意區分Executor接口和Executors工廠類),這個接口只有一個execute()方法,最後咱們看一下整個繼承樹

 

ThreadPoolExecutor

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
}

參數詳情:

corePoolSize 核心線程池大小,在剛開始建立線程池時,核心線程建立完不會當即啓動,有任務提交時纔會啓動達到核心線程數大小,若是一開始就要建立能夠調用prestartAllCoreThreads。

maximumPoolSize 容許的最大線程數量,當核心線程滿和阻塞隊列滿時纔會判斷最大線程數量,決定是否建立普通線程。

keepAliveTime 線程數大於核心線程數時,多餘空閒線程存活時間

unit keepAliveTime的時間單位

workQueue 線程數量超過核心線程數時用於保存任務,包含三種類型的BlockingQueue:有界隊列,無界隊列,同步移交。

    可選擇的BlockingQueue:

        1.LinkedBlockingQueue 無界隊列,遵循先進先出,使用該隊列作爲阻塞隊列時要尤爲小心,當任務耗時較長時可能會致使大量新任務在隊列中堆積最終致使OOM。閱讀代碼發現,Executors.newFixedThreadPool 採用就是 LinkedBlockingQueue,當QPS很高,發送數據很大,大量的任務被添加到這個無界LinkedBlockingQueue 中,致使cpu和內存飆升服務器掛掉。

        2.ArrayBlockingQueue 有界隊列 遵循先進先出,

        3.PriorityBlockingQueue 優先級隊列,任務的優先級由Comparator決定。

        4.SychronousQueue 將任務直接移交給工做線程,不是一個真正的隊列,是一種線程移交機制。必須有另外一個線程正在等待接收這個元素。只有在使用無界線程池或者有飽和策略時才建議使用該隊列。

 

threadPoolFactory 建立新線程時使用的工廠類

handler 阻塞隊列已滿,且線程數達到最大線程後所採起的飽和策略

 

ExecutorService的建立

Java給咱們提供了一個Executors工廠類,它能夠幫助咱們很方便的建立各類類型ExecutorService線程池,Executors一共能夠建立下面這四類線程池:

newCachedThreadPool:建立一個可緩存線程池,若是線程池長度超過處理須要,可靈活回收空閒線程,若無可回收,則新建線程。

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}

初始化時核心線程數爲0,若是線程池長度超過處理須要,可當即回收空閒線程,無可回收則新建。

要將一個元素放入SynchronousQueue中,必須有另外一個線程正在等待接收這個元素。所以即使SynchronousQueue一開始爲空且大小爲1,第一個任務也沒法放入其中,由於沒有線程在等待從SynchronousQueue中取走元素。所以第一個任務到達時便會建立一個新線程執行該任務。

 

newFixedThreadPool:建立一個定長線程池,可控制線程最大併發數,超出的線程會在隊列中等待。

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
 }

固定大小線程數,使用無限大的LinkedBlockingQueue存聽任務。

 

newScheduledThreadPool:建立一個定長線程池,支持定時及週期性任務執行。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

DelayedWorkQueue是一個無界阻塞隊列,是ScheduledThreadPoolExecutor的靜態內部類

newSingleThreadExecutor:建立一個單線程化的線程池,它只會用惟一的工做線程來執行任務,保證全部任務按照指定順序(FIFO, LIFO, 優先級)執行。

public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
 }

首先new了一個線程數目爲 1 的ScheduledThreadPoolExecutor,再把該對象傳入DelegatedScheduledExecutorService中,看看DelegatedScheduledExecutorService的實現代碼:

DelegatedScheduledExecutorService(ScheduledExecutorService executor) {
            super(executor);
            e = executor;
} 

#父類:
DelegatedExecutorService(ExecutorService executor) { 
           e = executor; 
}

其實就是使用裝飾模式加強了ScheduledExecutorService(1)的功能,不只確保只有一個線程順序執行任務,也保證線程意外終止後會從新建立一個線程繼續執行任務。

 

Executors只是一個工廠類,它全部的方法返回的都是ThreadPoolExecutorScheduledThreadPoolExecutor這兩個類的實例。

ExecutorService的使用

ExecutorService executorService = Executors.newFixedThreadPool(10);

executorService.execute(new Runnable() {
public void run() {
    System.out.println("Asynchronous task");
}
});

executorService.shutdown();

ExecutorService的執行

ExecutorService有以下幾個執行方法:

- execute(Runnable)
- submit(Runnable)
- submit(Callable)
- invokeAny(...)
- invokeAll(...)

execute(Runnable)

這個方法接收一個Runnable實例,而且異步的執行,請看下面的實例:

ExecutorService executorService = Executors.newSingleThreadExecutor();

executorService.execute(new Runnable() {
public void run() {
    System.out.println("Asynchronous task");
}
});

executorService.shutdown();

這個方法有個問題,就是沒有辦法獲知task的執行結果。若是咱們想得到task的執行結果,咱們能夠傳入一個Callable的實例。

submit(Runnable)

submit(Runnable)execute(Runnable)區別是前者能夠返回一個Future對象,經過返回的Future對象,咱們能夠檢查提交的任務是否執行完畢,請看下面執行的例子:

Future future = executorService.submit(new Runnable() {
public void run() {
    System.out.println("Asynchronous task");
}
});

future.get();  //returns null if the task has finished correctly.

若是任務執行完成,future.get()方法會返回一個null。注意,future.get()方法會產生阻塞。

submit(Callable)

submit(Callable)submit(Runnable)相似,也會返回一個Future對象,可是除此以外,submit(Callable)接收的是一個Callable的實現,Callable接口中的call()方法有一個返回值,能夠返回任務的執行結果,而Runnable接口中的run()方法是void的,沒有返回值。請看下面實例:

Future future = executorService.submit(new Callable(){
public Object call() throws Exception {
    System.out.println("Asynchronous Callable");
    return "Callable Result";
}
});

System.out.println("future.get() = " + future.get());

若是任務執行完成,future.get()方法會返回Callable任務的執行結果。注意,future.get()方法會產生阻塞。

invokeAny(…)

invokeAny(...)方法接收的是一個Callable的集合,執行這個方法不會返回Future,可是會返回全部Callable任務中其中一個任務的執行結果。這個方法也沒法保證返回的是哪一個任務的執行結果,反正是其中的某一個。請看下面實例:

ExecutorService executorService = Executors.newSingleThreadExecutor();

Set<Callable<String>> callables = new HashSet<Callable<String>>();

callables.add(new Callable<String>() {
public String call() throws Exception {
    return "Task 1";
}
});
callables.add(new Callable<String>() {
public String call() throws Exception {
    return "Task 2";
}
});
callables.add(new Callable<String>() {
    public String call() throws Exception {
    return "Task 3";
}
});

String result = executorService.invokeAny(callables);
System.out.println("result = " + result);
executorService.shutdown();

invokeAll(…)

invokeAll(...)與 invokeAny(...)相似也是接收一個Callable集合,可是前者執行以後會返回一個Future的List,其中對應着每一個Callable任務執行後的Future對象。狀況下面這個實例

ExecutorService executorService = Executors.newSingleThreadExecutor();

Set<Callable<String>> callables = new HashSet<Callable<String>>();

callables.add(new Callable<String>() {
public String call() throws Exception {
    return "Task 1";
}
});
callables.add(new Callable<String>() {
    public String call() throws Exception {
    return "Task 2";
}
});
callables.add(new Callable<String>() {
public String call() throws Exception {
    return "Task 3";
}
});

List<Future<String>> futures = executorService.invokeAll(callables);

for(Future<String> future : futures){
System.out.println("future.get = " + future.get());
}

executorService.shutdown();

ExecutorService的關閉

當咱們使用完成ExecutorService以後應該關閉它,不然它裏面的線程會一直處於運行狀態。

舉個例子,若是的應用程序是經過main()方法啓動的,在這個main()退出以後,若是應用程序中的ExecutorService沒有關閉,這個應用將一直運行。之因此會出現這種狀況,是由於ExecutorService中運行的線程會阻止JVM關閉。

若是要關閉ExecutorService中執行的線程,咱們能夠調用ExecutorService.shutdown()方法。在調用shutdown()方法以後,ExecutorService不會當即關閉,可是它再也不接收新的任務,直到當前全部線程執行完成纔會關閉,全部在shutdown()執行以前提交的任務都會被執行。

若是咱們想當即關閉ExecutorService,咱們能夠調用ExecutorService.shutdownNow()方法。這個動做將跳過全部正在執行的任務和被提交尚未執行的任務。可是它並不對正在執行的任務作任何保證,有可能它們都會中止,也有可能執行完成。

正確結束線程應該在線程內先使用break結束任務。

相關文章
相關標籤/搜索