線程池之Executor框架

線程池之Executor框架

Java的線程既是工做單元,也是執行機制。從JDK5開始,把工做機單元和執行機制分離開來。工做單元包括Runnable和Callable,而執行機制由Executor框架提供。html

1. Executor框架簡介

1.1 Executor框架的兩級調度模型

在上層,Java多線程程序一般把應用分解爲若干個任務,而後使用用戶級的調度器(Executor框架)將這些任務映射爲固定數量的線程。java

在底層,操做系統內核將這些線程映射到硬件處理器上。小程序

image-20200820220148034

1.2 Executor框架的結構

Executor框架主要由3部分組成:服務器

  • 任務。包括被執行任務須要實現的接口:Runnable接口或者Callable接口。
  • 任務的執行。包括任務執行機制的核心接口Executor,以及繼承自Executor的ExecutorService接口。Executor框架有兩個關鍵類實現了ExecutorService接口(ThreadPoolExecutor和ScheduledThreadPoolExecutor)。
  • 異步計算的結果。包括Future和實現Future的FutureTask類。

Executor框架的成員及其關係能夠用一下的關係圖表示:多線程

image-20200820221319222

Executor框架的使用示意圖:框架

image-20200820222719384

使用步驟:異步

  • 主線程首先建立實現Runnable或Callable接口的任務對象。工具類Executors能夠把一個Runnable對象封裝爲一個Callable對象(Executors.callable(Runnable task)Executors.callable(Runnable task, Object result))。
  • 建立Executor接口的實現類ThreadPoolExecutor類或者ScheduledThreadPoolExecutor類的對象,而後調用其execute()方法或者submit()方法把工做任務添加到線程中,若是有返回值則返回Future對象。其中Callable對象有返回值,所以使用submit()方法;而Runnable可使用execute()方法,此外還可使用submit()方法,只要使用callable(Runnable task)或者callable(Runnable task, Object result)方法把Runnable對象包裝起來就能夠,使用callable(Runnable task)方法返回的null,使用callable(Runnable task, Object result)方法返回result。
  • 主線程能夠執行Future對象的get()方法獲取返回值,也能夠調用cancle()方法取消當前線程的執行。

1.3 Executor框架的使用案例

import java.util.concurrent.*;

public class ExecutorDemo {
    // 建立ThreadPoolExecutor實現類
    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(
            5,
            10,
            100,
            TimeUnit.MILLISECONDS,
            new ArrayBlockingQueue<>(5),
    );

    public static void main(String[] args) {
        // 採用submit()方法提交Callable對象並返回Future對象
        Future<String> future = executor.submit(new callableDemo());
        try {
            // get()方法獲取返回值
            System.out.println(future.get());
        } catch (InterruptedException | ExecutionException e) {
            // 處理異常
            e.printStackTrace();
        } finally {
            // 關閉線程池
            executor.shutdown();
        }
    }
}

/**
 * 建立Callable接口的實現類
 */
class callableDemo implements Callable<String> {
    @Override
    public String call() throws Exception {
        Thread.sleep(1000);
        String s = "return string";
        return s;
    }
}

2. Executor框架成員

image-20200820225956625

2.1 ThreadPoolExecutor

直接建立ThreadPoolExecutor的實例對象,見https://www.cnblogs.com/chiaki/p/13536624.htmlide

ThreadPoolExecutor一般使用工廠類Executors建立,能夠建立3種類型的ThreadPoolExecutor,即FixedThreadPool、SingleThreadExecutor以及CachedThreadPool。工具

  • FixedThreadPool適用於爲了知足資源管理的需求,而須要限制當先線程數量的應用場景,適用於負載比較重的服務器。ui

    public static ExecutorService es = Executors.newFixedThreadPool(int threadNums);
    public static ExecutorService es = Executors.newFixedThreadPool(int threadNums, ThreadFactory threadFactory);
    public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }

    把線程池最大線程數量maxmumPoolSize和核心線程池的數量corePoolSize設置爲threadNums,將參數keepAliveTime設置爲0L。使用無界隊列LinkedBlockingQueue做爲阻塞隊列,所以當任務不能馬上執行時,都會添加到阻塞隊列中,並且maximumPoolSize,keepAliveTime都是無效的。

  • SingleThreadExecutor:適用於須要保證順序地執行各個任務;而且在任意時間點,不會有多個線程是活動地應用場景。**

    public static ExecutorService es = Executors.newSingleThreadExecutor();
    public static ExecutorService es = Executors.newSingleThreadExecutor(ThreadFactory threadFactory);
    public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }

    由於阻塞隊列使用的是LinkedBlockingQueue,所以和FixedThreadPool同樣,參數maximumPoolSize以及keepAliveTime都是無效的。corePoolSize爲1,所以最多隻能建立一個線程

  • CachedThreadPool大小無界的線程池,適用於執行不少的短時間異步任務的小程序,或者是負載較輕的服務器。

    public static ExecutorService es = Executors.newCachedThreadPool();
    public static ExecutorService es = Executors.newCachedThreadPool(ThreadFactory threadFactory);
    public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }

    CachedThreadPool使用SynchronizedQueue做爲阻塞隊列,SynchronizedQueue是不存儲元素的阻塞隊列,實現「一對一的交付」,也就是說,每次向隊列中put一個任務必須等有線程來take這個任務,不然就會一直阻塞該任務,若是一個線程要take一個任務就要一直阻塞知道有任務被put進阻塞隊列。

    由於CachedThreadPool的maximumPoolSize爲Integer.MUX_VALUE,所以CachedThreadPool是無界的線程池,也就是說能夠一直不斷的建立線程,這樣可能會使CPU和內存資源耗盡。corePoolSize爲0 ,所以在CachedThreadPool中直接經過阻塞隊列來進行任務的提交

2.2 ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor類繼承了ThreadPoolExecutor並實現了ScheduledExecutorService接口。主要用於在給定的延遲後執行任務或者按期執行任務。

ScheduledThreadPoolExecutor一般使用Executors工廠類來建立,可建立2種類型的ScheduledThreadPoolExecutor,即ScheduledThreadPoolExecutor和SingleThreadScheduledExecutor。

  • ScheduledThreadPoolExecutor:適用於若干個(固定)線程延時或者按期執行任務,同時爲了知足資源管理的需求而須要限制後臺線程數量的場景。

    public static ScheduledExecutorService ses = Executors.newScheduledThreadPool(int threadNums);
    public static ScheduledExecutorService ses = Executors.newScheduledThreadPool(int threadNums, ThreadFactory threadFactory);
  • SingleThreadScheduledExecutor:適用於須要單個線程延時或者按期的執行任務,同時須要保證各個任務順序執行的應用場景。

    public static ScheduledExecutorService ses = Executors.newSingleThreadScheduledExecutor(int threadNums);
    public static ScheduledExecutorService ses = Executors.newSingleThreadScheduledExecutor(int threadNums, ThreadFactory threadFactory);

ScheduledThreadPoolExecutor的實現:

ScheduledThreadPoolExecutor的實現主要是經過把任務封裝爲ScheduledFutureTask來實現。經過調用scheduledAtFixedTime()方法或者scheduledWithFixedDelay()方法向阻塞隊列添加一個實現了RunnableScheduledFutureTask接口的ScheduledFutureTask類對象。ScheduledFutureTask主要包括3個成員變量:

// 序列號,用於保存任務添加到阻塞隊列的順序
private final long sequenceNumber;
// 用於保存該任務將要被執行的具體時間
private long time;
// 週期,用於保存任務直線的間隔週期
private final long period;

ScheduledTreadPoolExecutor的阻塞隊列是用無界隊列DelayQueue實現的,能夠實現元素延時delayTime後才能獲取元素,在ScheduledThreadPoolExecutor中,DelayQueue內部封裝了一個PriorityQueue,來對任務進行排序,首先對time排序,time小的在前,若是time同樣,則sequence小的在前,也就是說若是time同樣,那麼先被提交的任務先執行。

由於DelayQueue是一個無界的隊列,所以線程池的maximumPoolSize是無效的。ScheduledThreadPoolExecutor的工做流程大體以下:

image-20200820233614169

2.3 Future接口/FutureTask實現類

Future接口和實現Future接口的FutureTask實現類,表明異步計算的結果。

2.3.1 FutureTask的使用

FutureTask除了實現Future接口外還實現了Runnable接口。所以,FutureTask能夠交給Executor執行,也能夠條用線程直接執行(FutureTask.run())。根據FutureTask.run()方法被執行的時機,FutureTask可處於如下3種狀態:

  • 未啓動:建立了一個FutureTask對象但沒有執行FutureTask.run();
  • 已啓動:FutureTask.run()方法被執行的過程當中;
  • 已完成:FutureTask.run()正常執行結束,或者FutureTask被取消(FutureTask.cancel()),或者執行FutureTask.run()時拋出異常而異常結束;

狀態遷移示意圖:

image-20200820234612077

FutureTask的get和cancle執行示意圖:

image-20200820234818187

2.3.2 FutureTask的實現

FutureTask是一個基於AQS同步隊列實現的一個自定義同步組件,經過對同步狀態state的競爭實現acquire或者release操做。

FutureTask的內部類Sync實現了AQS接口,經過對tryAcquire等抽象方法的重寫和模板方法的調用來實現內部類Sync的tryAcquireShared等方法,而後聚合Sync的方法來實現FutureTask的get和cancel等方法。

FutureTask的設計示意圖:

image-20200820235358434

FutureTask的get方法最終會調用AQS.acquireSharedInterruptibly(int arg)方法:

  • 調用AQS.acquireSharedInterruptibly(int arg)方法會首先調用tryAcquireShared()方法判斷acquire操做是否能夠成功,能夠成功的條件是state爲執行完成狀態RAN或者已取消狀態CANCELLED,且runner不爲null;
  • 若是成功則get()方法當即返回,若是失敗則到線程等待隊列執行release操做;
  • 當其餘線程執行release操做喚醒當前線程後(好比FutureTask.run()FutureTask.cancle(...)),當前線程再次執行tryAcquireShared()將返回正值1,當前線程離開現場等待隊列並喚醒它的後繼線程(級聯喚醒);
  • 最後返回計算的結果或拋出異常。

image-20200821000310543

2.3.3 FutureTask的使用場景
  • 當一個線程須要等待另外一個線程把某個任務執行完之後它才能繼續執行時;
  • 有若干線程執行若干任務,每一個任務最多隻能被執行一次;
  • 當多個線程師徒執行同一個任務,但只能容許一個線程執行此任務,其它線程須要等這個任務被執行完畢之後才能繼續執行時。

2.4 Runnable和Callable接口

用於實現線程要執行的工做單元。

2.5 Executors工廠類

提供了常見配置線程池的方法,由於ThreadPoolExecutor的參數衆多且意義重大,爲了不配置出錯,纔有了Executors工廠類。

3. 爲何不建議使用Executors建立線程池?

FixedThreadPoolSingleThreadExecutor:容許請求的隊列長度爲Integer.MAX_VALUE(無界的阻塞隊列),可能堆積大量的請求,從而致使OOM。

CachedThreadPoolScheduledThreadPool:容許建立的線程數量爲Integer.MAX_VALUE(無界的阻塞隊列),可能會建立大量線程,從而致使OOM。

相關文章
相關標籤/搜索