Java併發編程實戰筆記4:線程池

在生產環境中,爲每一個任務分配一個線程存在一些缺陷,尤爲是當須要建立大量線程時。java

  • 線程生命週期的開銷很是高。線程的建立和銷燬都是有開銷的。
  • 資源消耗。活躍的線程會消耗系統資源,尤爲是內存。
  • 穩定性。在可建立線程的數量上存在一個限制。若是破壞了這個限制,極可能拋出OutOfMemoryError異常。

也就是說,在必定範圍內,增長線程能夠提升系統的吞吐率,可是超過了這個範圍,再建立更多的線程只會下降程序的執行速度。web

在《阿里巴巴Java手冊》中提到: 編程

線程池是指管理一組同構工做線程的資源池。線程從工做隊列中獲取一個任務,執行任務,而後返回線程等待下一個任務。緩存

在線程池中執行任務比爲每個任務分配一個線程優點更多。經過重用現有的線程而不是建立新線程,能夠分攤在線程建立和銷燬過程當中產生的巨大開銷。當請求到達時,工做線程一般已經存在,所以不會因爲等待建立線程延遲任務執行,提升了響應性。經過適當調整線程池的大小,能夠建立足夠多的線程使得處理器保持忙綠,同時還能夠防止過多線程互相競爭使得內存耗盡。服務器

所以應該使用線程池。其目的包括:多線程

  • 線程是稀缺資源,不能頻繁建立
  • 線程的建立和執行徹底分開,方便維護,進行了解耦
  • 將線程放入池中,能夠給其餘任務複用

Executor框架

Executor框架是在java.util.concurrent包下,在Java 5中引入的。經過Executor來啓動線程比使用Thread.start()方法更好,更易於管理且效率更高(用線程池實現節約了開銷),而且還有助於避免this逸出。併發

Executor兩級調度模型以下。用戶將多個任務提交給Executor框架,框架在線程池中分配線程執行它們。而後操做系統再將這些線程分配給處理器執行。 框架

Executor框架結構圖:
框架中的全部類能夠分爲三類:

  1. 任務:Runnable和Callable
  2. 任務執行器:Executor,其子接口爲ExecutorService,該子接口的實現類有:ThreadPoolExecutor和ScheduledThreadPoolExecutor
  3. 執行結果:使用Future接口表示異步的執行結果,其實現類爲FutureTask。

Executor

Executor是個簡單的接口,它支持多種不一樣類型的任務執行策略,提供了一種標準的方法將任務的提交過程與執行過程解耦,並用Runnable表示任務。Executor的實現還提供了對生命週期的支持。異步

Executor接口中之定義了一個方法execute(Runnable command),該方法接收一個Runable實例,它用來執行一個任務,任務即一個實現了Runnable接口的類。socket

Executor基於生產者-消費者模式,提交任務的操做至關於生產者,執行任務的線程至關於消費者。

class TaskExecWebServer{
    private static final int NTHREADS=100;
    private static final Executor executor=Executors.newFixedThreadPool(NTHREADS);

    public static void main(String[] args) throws IOException {
        ServerSocket serverSocket=new ServerSocket(8080);
        while(true){
            final Socket socket=serverSocket.accept();
            Runnable task=new Runnable() {
                @Override
                public void run() {
                    System.out.println(socket);
                }
            };
            executor.execute(task);
        }
    }
}

class ThreadPerTaskExecutor implements Executor{
    @Override
    public void execute(Runnable command) {
        new Thread(command).start();
    }
}

class WithinThreadExecutor implements Executor{
    @Override
    public void execute(Runnable command) {
        command.run();
    }
}
複製代碼

在上述代碼中,咱們首先在webServer中使用了一個固定大小爲100的線程池。而後使用executor.execute(task)來提交線程。在執行的過程當中,能夠採用不一樣的執行方法。能夠爲每一個請求啓動一個新的線程,如ThreadPerTaskExecutor,還能夠以同步方式執行全部任務,如WithinThreadExecutor。這樣就實現了把任務的提交和執行解耦。

ThreadPoolExecutor

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) 
複製代碼
  • corePoolSize是線程池的基本大小。
  • maximumPoolSize是線程池的最大線程大小。
  • keepAliveTimeunit是線程空閒後的存活時間
  • workQueue是用於存聽任務的阻塞隊列
  • handler是當隊列和最大線程池都滿了以後的飽和策略

execute()

  1. 首先獲取線程池的狀態
  2. 若是線程池中線程數量少於corePoolSize,即便線程池中有空閒線程,也會建立一個新線程來執行新添加的任務。
  3. 若是當前線程處於運行狀態,而且成功的寫入了阻塞隊列
  4. 雙重檢查,再次獲取線程狀態;若是線程不是運行狀態就須要從阻塞隊列移除,並嘗試判斷線程是否執行完畢。同時執行拒絕策略。
  5. 若是當前線程池爲空就新建立一個線程並執行。
  6. 若是在第三部的判斷爲非運行狀態,建立新線程,失敗則執行拒絕策略。

生命週期

爲了解決執行服務的生命週期問題,Executor拓展了ExecutorService接口,添加了一些用於生命週期管理的方法。ExecutorService的生命週期有三種狀態:運行、關閉和已終止。

在初始建立時,ExecutorService處於運行狀態。使用shutdown()方法將再也不接受新的任務,同時等待提交的任務都執行完成以後再關閉。調用shutdownNow()方法,至關於調用了每一個線程的interrupt()方法,將嘗試取消全部運行中的任務,而且再也不啓動隊列中還沒有開始執行的任務。

在ThreadPoolExecutor中,定義了以下狀態:

  • RUNNING 是運行狀態,能夠接受執行隊列中的任務
  • SHOTDOWN 是調用了shutdown()方法
  • STOP 是調用了shutdownNow()方法
  • TIDYING 是全部任務都執行完畢的狀態,在調用shutdown()或shutdownNow()時都會嘗試更新爲這個狀態
  • TERMINATED 是終止狀態,當執行terminated()會更新爲這個狀態

線程池

Executors提供了一系列工廠方法用於建立線程池,返回的線程池都實現了ExecutorService接口。

1.newFixedThreadPool(int nThreads)

建立了固定線程數目的線程池,每當提交一個任務時就建立一個縣城,直到達到線程池的最大數量。當線程池滿時,有新的線程要創建,只能放在另外的隊列中等待,直到當前的線程中某個線程被移出。

針對一些穩定的併發線程,多用於服務器。

2.newCachedThreadPool()

建立了一個能夠緩存的線程池,沒有規模限制。調用execute將重用之前構造的線程(若是線程可用)。若是當前沒有可用線程,則建立一個新線程;若是當前規模超過了處理需求,將從緩存中終止並移除已經有60秒未使用的線程。

放入該線程池的線程超過timeout不活動,會被自動終止,缺省設置爲60秒。

一般用於執行一些生存期很短的異步型任務。

3.newSingleThreadExecutor()

建立了一個單線程的Executor,建立了單個線程來執行任務,若是線程異常結束,會建立另外一個線程來替代。

確保依照任務在隊列中的順序來串行執行(FIFO,LIFO,優先級)。

注意

4.newSechduledThreadPool(int corePoolSize)

與前幾種不一樣的是,這個方法返回的是ScheduledExecutorService,前邊三種返回的是ExecutorService。建立了一個固定長度的線程池,而且是支持定時以及週期性的任務執行的線程池。多數狀況下用來替代Timer類。

Timer與ScheduledThreadPoolExecutor

Timer用來負責管理延遲任務以及週期任務,可是它存在一些缺陷,應該使用與ScheduledThreadPoolExecutor來替代它。

Timer類在執行全部定時任務時只會建立一個線程,當有多個定時任務時,就會產生延遲。

當多個定時任務中有一個任務拋出異常,全部的任務都沒法執行。

Timer執行週期任務時依賴系統時間。後者不會因爲系統時間的改變而發生執行的變化。

提交與執行任務

任務分爲兩類,一類是實現了Runnable接口的類,一類是實現了Callable接口的類,都能被Executor執行。

Runnable是一種有侷限的抽象,由於它不能返回值或者拋出一個受檢查的異常。

Callable的call()方法相似於Runnable的run()方法,可是call()方法有返回值,而且可能拋出一個異常。

任務的生命週期包括:建立,提交,開始和完成。在Executor框架中,已經提交但還沒有開始的任務能夠取消,對於已經開始執行的任務,只有當它們響應中斷時,才能取消。

向線程池中提交線程的時候有兩種方法:execute()和submit()。

execute()提交只能提交一個Runnable對象,且返回值是void。也就是說提交後若是線程運行,和主線程就脫離了關係。

submit()提交:

  • Future submit(Callable task)
  • Future<?> submit(Runnable task)
  • Future submit(Callable task,T result)

submit()方法能夠提交一個Callable接口的對象,也能提交一個Ruuable的對象。使用這種方式提交會返回一個Future對象,表明了該線程的執行結果,主線程經過get方法獲取到從線程中返回的結果數據。使用Future能夠表示一個任務的生命週期,並提供了相應的方法來判斷是否已經完成或取消;還能獲取任務的結果;並能取消任務。

Future的get()方法:

  • 任務已經完成,返回結果或者拋出一個異常
  • 任務沒有完成,將阻塞直至完成
  • 任務拋出異常,get()方法將該異常封裝爲ExecutionExcepiton並從新拋出,可使用getCause()來獲取被封裝的初始異常
  • 任務被取消,get()方法將拋出CancellationException

CompletionService

將executor與blockingqueue融合在一塊兒,將Callable任務交由它執行,使用相似隊列操做的take和poll等方法得到future。也就是在構造函數中建立了阻塞隊列來保存執行完成的結果。

爲任務設計時限

使用Future.get(long,timeType)爲任務設計時限,時限內有結果則返回,超時則拋出TimeOutException

支持時限的還有invokeAll()方法,它將多個任務提交給ExecutorService並獲取結果。其參數爲一組任務,返回一組Future,按照任務集合中的順序將Future添加到返回的集合,實現了兩者的關聯。當全部任務執行完畢,或超市,或中斷,該方法將返回。經過get或isCancelled來判斷每一個任務的執行狀況。

參考資料

相關文章
相關標籤/搜索