11 java 線程池 使用實例

在前面的文章中,咱們使用線程的時候就去建立一個線程,這樣實現起來很是簡便,可是就會有一個問題:html

若是併發的線程數量不少,而且每一個線程都是執行一個時間很短的任務就結束了,這樣頻繁建立線程就會大大下降系統的效率,由於頻繁建立線程和銷燬線程須要時間。java

那麼有沒有一種辦法使得線程能夠複用,就是執行完一個任務,並不被銷燬,而是能夠繼續執行其餘的任務?緩存

在Java中能夠經過線程池來達到這樣的效果。服務器

 

1 線程池作什麼網絡

 

網絡請求一般有兩種形式:多線程

第一種,請求不是很頻繁,並且每次鏈接後會保持至關一段時間來讀數據或者寫數據,最後斷開,如文件下載,網絡流媒體等。架構

另外一種形式是請求頻繁,可是鏈接上之後讀/寫不多量的數據就斷開鏈接。考慮到服務的併發問題,若是每一個請求來到之後服務都爲它啓動一個線程,那麼這對服務的資源可能會形成很大的浪費,特別是第二種狀況。併發

由於一般狀況下,建立線程是須要必定的耗時的,設這個時間爲T1,而鏈接後讀/寫服務的時間爲T2,當T1>>T2時,咱們就應當考慮一種策略或者機制來控制,使得服務對於第二種請求方式也能在較低的功耗下完成。異步

一般,咱們能夠用線程池來解決這個問題,首先,在服務啓動的時候,咱們能夠啓動好幾個線程,並用一個容器(如線程池)來管理這些線程。ide

當請求到來時,能夠從池中取一個線程出來,執行任務(一般是對請求的響應),當任務結束後,再將這個線程放入池中備用;

若是請求到來而池中沒有空閒的線程,該請求須要排隊等候。最後,當服務關閉時銷燬該池便可。

 

多線程技術主要解決處理器單元內多個線程執行的問題,它能夠顯著減小處理器單元的閒置時間,增長處理器單元的吞吐能力。
假設一個服務器完成一項任務所需時間爲:T1 建立線程時間,T2 在線程中執行任務的時間,T3 銷燬線程時間。
若是:T1 + T3 遠大於 T2,則能夠採用線程池,以提升服務器性

線程池技術正是關注如何縮短或調整T1,T3時間的技術,從而提升服務器程序性能的

它把T1,T3分別安排在服務器程序的啓動和結束的時間段或者一些空閒的時間段,這樣在服務器程序處理客戶請求時,不會有T1,T3的開銷了。

線程池不只調整T1,T3產生的時間段,並且它還顯著減小了建立線程的數目,看一個例子:

假設一個服務器一天要處理50000個請求,而且每一個請求須要一個單獨的線程完成。在線程池中,線程數通常是固定的,因此產生線程總數不會超過線程池中線程的數目,

而若是服務器不利用線程池來處理這些請求則線程總數爲50000。通常線程池大小是遠小於50000。

因此利用線程池的服務器程序不會爲了建立50000而在處理請求時浪費時間,從而提升效率。

 

合理利用線程池可以帶來三個好處:

第一:下降資源消耗。經過重複利用已建立的線程下降線程建立和銷燬形成的消耗。

第二:提升響應速度。當任務到達時,任務能夠不須要等到線程建立就能當即執行。

第三:提升線程的可管理性。線程是稀缺資源,若是無限制的建立,不只會消耗系統資源,還會下降系統的穩定性,使用線程池能夠進行統一的分配,調優和監控。

        可是要作到合理的利用線程池,必須對其原理了如指掌。

 

2 線程池的繼承架構

程序啓動一個新線程成本是比較高的,由於它涉及到要與操做系統進行交互。而使用線程池能夠很好的提升性能,尤爲是當程序中要建立大量生存期很短的線程時,更應該考慮使用線程池。

線程池裏的每個線程代碼結束後,並不會死亡,而是再次回到線程池中成爲空閒狀態,等待下一個對象來使用。

在JDK5以前,咱們必須手動實現本身的線程池,從JDK5開始,Java內置支持線程池

 

Java裏面線程池的頂級接口是Executor,可是嚴格意義上講Executor並非一個線程池,而只是一個執行線程的工具。

真正的線程池接口是ExecutorService。下面這張圖完整描述了線程池的類體系結構。

Executor是一個頂層接口,在它裏面只聲明瞭一個方法execute(Runnable),返回值爲void,參數爲Runnable類型,從字面意思能夠理解,就是用來執行傳進去的任務的;

 

而後ExecutorService接口繼承了Executor接口,並聲明瞭一些方法:submit、invokeAll、invokeAny以及shutDown等;

抽象類AbstractExecutorService實現了ExecutorService接口,基本實現了ExecutorService中聲明的全部方法;

而後ThreadPoolExecutor繼承了類AbstractExecutorService。

 

標記一下比較重要的類:

ExecutorService:

真正的線程池接口。

ScheduledExecutorService

能和Timer/TimerTask相似,解決那些須要任務重複執行的問題。

ThreadPoolExecutor

ExecutorService的默認實現。

ScheduledThreadPoolExecutor

繼承ThreadPoolExecutor的ScheduledExecutorService接口實現,週期性任務調度的類實現。

 

要配置一個線程池是比較複雜的,尤爲是對於線程池的原理不是很清楚的狀況下,頗有可能配置的線程池不是較優的,

所以在Executors類裏面提供了一些靜態工廠,生成一些經常使用的線程池。

 

newSingleThreadExecutor:建立一個單線程的線程池。這個線程池只有一個線程在工做,也就是至關於單線程串行執行全部任務。

                                           若是這個惟一的線程由於異常結束,那麼會有一個新的線程來替代它。此線程池保證全部任務的執行順序按照任務的提交順序執行。

newFixedThreadPool:建立固定大小的線程池。每次提交一個任務就建立一個線程,直到線程達到線程池的最大大小。

                                   線程池的大小一旦達到最大值就會保持不變,若是某個線程由於執行異常而結束,那麼線程池會補充一個新線程。

newCachedThreadPool:建立一個可緩存的線程池。若是線程池的大小超過了處理任務所須要的線程,那麼就會回收部分空閒(60秒不執行任務)的線程,

                                       當任務數增長時,此線程池又能夠智能的添加新線程來處理任務。

                                       此線程池不會對線程池大小作限制,線程池大小徹底依賴於操做系統(或者說JVM)可以建立的最大線程大小。

newScheduledThreadPool:建立一個大小無限的線程池。此線程池支持定時以及週期性執行任務的需求。

newSingleThreadExecutor:建立一個單線程的線程池。此線程池支持定時以及週期性執行任務的需求。

 

這些方法的返回值是ExecutorService對象,該對象表示一個線程池,能夠執行Runnable對象或者Callable對象表明的線程。

它提供了以下方法來提交一個任務:

Future<?> submit(Runnable task)

<T> Future<T> submit(Callable<T> task)

 Callable 與 Runable的相關內容參見:

03 建立線程的第3式 
02 如何建立線程 線程併發與synchornized
 
3 使用線程池步驟及案例

線程池的好處:線程池裏的每個線程代碼結束後,並不會死亡,而是再次回到線程池中成爲空閒狀態,等待下一個對象來使用。

如何實現線程的代碼呢?

A:建立一個線程池對象,控制要建立幾個線程對象。

    public static ExecutorService newFixedThreadPool(int nThreads)

B:這種線程池的線程能夠執行:

  能夠執行Runnable對象或者Callable對象表明的線程

  作一個類實現Runnable接口。

C:調用以下方法便可

    Future<?> submit(Runnable task)

    <T> Future<T> submit(Callable<T> task)

D:我就要結束,能夠嗎? 能夠。

 1 package com.jt.thread.demo05;
 2 
 3 import java.util.concurrent.ExecutorService;
 4 import java.util.concurrent.Executors;
 5 
 6 class MyRunnable implements Runnable {
 7     @Override
 8     public void run() {
 9         for (int x = 0; x < 100; x++) {
10             System.out.println(Thread.currentThread().getName() + ":" + x);
11        }
12     }
13 }
14 
15 public class ExecutorServiceDemo {
16     public static void main(String[] args) {
17      // 建立一個線程池對象,控制要建立幾個線程對象。
18      // public static ExecutorService newFixedThreadPool(int nThreads)
19      ExecutorService pool = Executors.newFixedThreadPool(2);
20 
21      // 能夠執行Runnable對象或者Callable對象表明的線程
22      pool.submit(new MyRunnable());
23      pool.submit(new MyRunnable());
24 
25     //結束線程池
26     pool.shutdown();
27    }
28 } 

運行結果:

pool-1-thread-1:0

pool-1-thread-1:1

pool-1-thread-1:2

pool-1-thread-2:0

pool-1-thread-2:1

pool-1-thread-2:2

pool-1-thread-2:3

。。。

 

說明:

 

(1 newFixedThreadPool

是固定大小的線程池 有結果可見 咱們指定2 在運行時就只有2個線程工做

若其中有一個線程異常  會有新的線程替代他

 

(2 shutdown方法有2個重載:

void shutdown() 啓動一次順序關閉,等待執行之前提交的任務完成,但不接受新任務。

List<Runnable> shutdownNow() 試圖當即中止全部正在執行的活動任務,暫停處理正在等待的任務,並返回等待執行的任務列表。

 

(3 submit 與 execute

3.1 submit是ExecutorService中的方法 用以提交一個任務

他的返回值是future對象  能夠獲取執行結果

<T> Future<T> submit(Callable<T> task) 提交一個返回值的任務用於執行,返回一個表示任務的未決結果的 Future。

Future<?> submit(Runnable task) 提交一個 Runnable 任務用於執行,並返回一個表示該任務的 Future。

<T> Future<T> submit(Runnable task, T result) 提交一個 Runnable 任務用於執行,並返回一個表示該任務的 Future。

 

3.2 execute是Executor接口的方法

他雖然也能夠像submit那樣讓一個任務執行  但並不能有返回值

 

void execute(Runnable command)

在將來某個時間執行給定的命令。該命令可能在新的線程、已入池的線程或者正調用的線程中執行,這由 Executor 實現決定。

 

(4 Future

Future 表示異步計算的結果。

它提供了檢查計算是否完成的方法,以等待計算的完成,並獲取計算的結果。

計算完成後只能使用 get 方法來獲取結果,若有必要,計算完成前能夠阻塞此方法。

取消則由 cancel 方法來執行。還提供了其餘方法,以肯定任務是正常完成仍是被取消了。一旦計算完成,就不能再取消計算。

若是爲了可取消性而使用 Future 但又不提供可用的結果,則能夠聲明 Future<?> 形式類型、並返回 null 做爲底層任務的結果。

 

Future就是對於具體的Runnable或者Callable任務的執行結果進行取消、查詢是否完成、獲取結果。

必要時能夠經過get方法獲取執行結果,該方法會阻塞直到任務返回結果。 

也就是說Future提供了三種功能:

--判斷任務是否完成;

--可以中斷任務;

--可以獲取任務執行結果。

 

boolean cancel(boolean mayInterruptIfRunning) 試圖取消對此任務的執行。

V get() 若有必要,等待計算完成,而後獲取其結果。

V get(long timeout, TimeUnit unit) 若有必要,最多等待爲使計算完成所給定的時間以後,獲取其結果(若是結果可用)。

boolean isCancelled() 若是在任務正常完成前將其取消,則返回 true。

boolean isDone() 若是任務已完成,則返回 true。

 

4 線程池簡單使用案例2

java.util.concurrent.Executors類的API提供大量建立鏈接池的靜態方法:

 

 1 import java.util.concurrent.Executors;
 2 import java.util.concurrent.ExecutorService;
 3 public class JavaThreadPool {
 4   public static void main(String[] args) {
 5     // 建立一個可重用固定線程數的線程池
 6     ExecutorService pool = Executors.newFixedThreadPool(2);
 7     // 建立實現了Runnable接口對象,Thread對象固然也實現了Runnable接口
 8     Thread t1 = new MyThread();
 9     Thread t2 = new MyThread();
10     Thread t3 = new MyThread();
11     Thread t4 = new MyThread();
12     Thread t5 = new MyThread();
13     // 將線程放入池中進行執行
14     pool.execute(t1);
15     pool.execute(t2);
16     pool.execute(t3);
17     pool.execute(t4);
18     pool.execute(t5);
19     // 關閉線程池
20     pool.shutdown();
21    }
22 }
23 class MyThread extends Thread {
24    @Override
25    public void run() {
26     System.out.println(Thread.currentThread().getName() + "正在執行… …");
27    }
28 }

運行效果示例:

 

pool-1-thread-2正在執行… …

pool-1-thread-2正在執行… …

pool-1-thread-2正在執行… …

pool-1-thread-2正在執行… …

pool-1-thread-1正在執行… …

 

 

pool-1-thread-1正在執行… …

pool-1-thread-1正在執行… …

pool-1-thread-1正在執行… …

pool-1-thread-1正在執行… …

pool-1-thread-2正在執行… …

 

可見線程池中有2個線程在工做,可見 newFixedThreadPool 是固定大小的線程池

 

5 單任務線程池:

//建立一個使用單個 worker 線程的 Executor,以無界隊列方式來運行該線程。

ExecutorService pool = Executors.newSingleThreadExecutor(); 

案例:

 1 import java.util.concurrent.ExecutorService;
 2 import java.util.concurrent.Executors;
 3 
 4 public class SingleThreadPollDemo {
 5 
 6 public static void main(String[] args) {
 7    // 建立一個使用單個 worker 線程的 Executor,以無界隊列方式來運行該線程。
 8    ExecutorService pool = Executors.newSingleThreadExecutor();
 9 
10    Runnable task1 = new SingelTask();
11    Runnable task2 = new SingelTask();
12    Runnable task3 = new SingelTask();
13 
14    pool.execute(task3);
15    pool.execute(task2);
16    pool.execute(task1);
17 
18    // 等待已提交的任務所有結束 再也不接受新的任務
19    pool.shutdown();
20   }
21 }
22 
23 class SingelTask implements Runnable{
24 
25 @Override
26 public void run() {
27   System.out.println(Thread.currentThread().getName() + "正在執行… …");
28   try {
29    Thread.sleep(3000);
30   } catch (InterruptedException e) {
31    e.printStackTrace();
32   }
33   System.out.println(Thread.currentThread().getName() + "執行完畢");
34  }
35 
36 }

運行結果:

 

pool-1-thread-1正在執行… …

pool-1-thread-1執行完畢

pool-1-thread-1正在執行… …

pool-1-thread-1執行完畢

pool-1-thread-1正在執行… …

pool-1-thread-1執行完畢

 

可見線程池中只有一個線程在執行任務

6 小結:

對於以上兩種鏈接池,大小都是固定的,當要加入的池的線程(或者任務)超過池最大尺寸時候,則入此線程池須要排隊等待。

一旦池中有線程完畢,則排隊等待的某個線程會入池執行。

 

其餘線程池示例:

 

固定大小線程池

import java.util.concurrent.Executors; 

import java.util.concurrent.ExecutorService;

ExecutorService pool = Executors.newFixedThreadPool(2);

pool.execute(t1);

pool.shutdown();

 

單任務線程池

ExecutorService pool = Executors.newSingleThreadExecutor();

 

可變尺寸線程池

ExecutorService pool = Executors.newCachedThreadPool();

 

延遲鏈接池

import java.util.concurrent.Executors; 

import java.util.concurrent.ScheduledExecutorService; 

import java.util.concurrent.TimeUnit;

ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);

pool.schedule(t4, 10, TimeUnit.MILLISECONDS);

 

單任務延遲鏈接池

ScheduledExecutorService pool = Executors.newSingleThreadScheduledExecutor();

 

7 使用示例

 1 public class Test {
 2      public static void main(String[] args) {   
 3          ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
 4                  new ArrayBlockingQueue<Runnable>(5));
 5           
 6          for(int i=0;i<15;i++){
 7              MyTask myTask = new MyTask(i);
 8              executor.execute(myTask);
 9              System.out.println("線程池中線程數目:"+executor.getPoolSize()+",隊列中等待執行的任務數目:"+
10              executor.getQueue().size()+",已執行完別的任務數目:"+executor.getCompletedTaskCount());
11          }
12          executor.shutdown();
13      }
14 }
15  
16  
17 class MyTask implements Runnable {
18     private int taskNum;
19      
20     public MyTask(int num) {
21         this.taskNum = num;
22     }
23      
24     @Override
25     public void run() {
26         System.out.println("正在執行task "+taskNum);
27         try {
28             Thread.currentThread().sleep(4000);
29         } catch (InterruptedException e) {
30             e.printStackTrace();
31         }
32         System.out.println("task "+taskNum+"執行完畢");
33     }
34 }

執行結果:

正在執行task 0
線程池中線程數目:1,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0
線程池中線程數目:2,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0
正在執行task 1
線程池中線程數目:3,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0
正在執行task 2
線程池中線程數目:4,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0
正在執行task 3
線程池中線程數目:5,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0
正在執行task 4
線程池中線程數目:5,隊列中等待執行的任務數目:1,已執行玩別的任務數目:0
線程池中線程數目:5,隊列中等待執行的任務數目:2,已執行玩別的任務數目:0
線程池中線程數目:5,隊列中等待執行的任務數目:3,已執行玩別的任務數目:0
線程池中線程數目:5,隊列中等待執行的任務數目:4,已執行玩別的任務數目:0
線程池中線程數目:5,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0
線程池中線程數目:6,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0
正在執行task 10
線程池中線程數目:7,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0
正在執行task 11
線程池中線程數目:8,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0
正在執行task 12
線程池中線程數目:9,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0
正在執行task 13
線程池中線程數目:10,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0
正在執行task 14
task 3執行完畢
task 0執行完畢
task 2執行完畢
task 1執行完畢
正在執行task 8
正在執行task 7
正在執行task 6
正在執行task 5
task 4執行完畢
task 10執行完畢
task 11執行完畢
task 13執行完畢
task 12執行完畢
正在執行task 9
task 14執行完畢
task 8執行完畢
task 5執行完畢
task 7執行完畢
task 6執行完畢
task 9執行完畢
View Code

從執行結果能夠看出,當線程池中線程的數目大於5時,便將任務放入任務緩存隊列裏面,當任務緩存隊列滿了以後,便建立新的線程。

若是上面程序中,將for循環中改爲執行20個任務,就會拋出任務拒絕異常了。

不過在java doc中,並不提倡咱們直接使用ThreadPoolExecutor,而是使用Executors類中提供的幾個靜態方法來建立線程池:

1 Executors.newCachedThreadPool();        //建立一個緩衝池,緩衝池容量大小爲Integer.MAX_VALUE
2 Executors.newSingleThreadExecutor();    //建立容量爲1的緩衝池
3 Executors.newFixedThreadPool(int);      //建立固定容量大小的緩衝池

下面是這三個靜態方法的具體實現:

 1 public static ExecutorService newFixedThreadPool(int nThreads) {
 2     return new ThreadPoolExecutor(nThreads, nThreads,
 3                                   0L, TimeUnit.MILLISECONDS,
 4                                   new LinkedBlockingQueue<Runnable>());
 5 }
 6 public static ExecutorService newSingleThreadExecutor() {
 7     return new FinalizableDelegatedExecutorService
 8         (new ThreadPoolExecutor(1, 1,
 9                                 0L, TimeUnit.MILLISECONDS,
10                                 new LinkedBlockingQueue<Runnable>()));
11 }
12 public static ExecutorService newCachedThreadPool() {
13     return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
14                                   60L, TimeUnit.SECONDS,
15                                   new SynchronousQueue<Runnable>());
16 }

     從它們的具體實現來看,它們實際上也是調用了ThreadPoolExecutor,只不過參數都已配置好了。

  newFixedThreadPool建立的線程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;

  newSingleThreadExecutor將corePoolSize和maximumPoolSize都設置爲1,也使用的LinkedBlockingQueue;

  newCachedThreadPool將corePoolSize設置爲0,將maximumPoolSize設置爲Integer.MAX_VALUE,使用的SynchronousQueue,也就是說來了任務就建立線程運行,當線程空閒超過60秒,就銷燬線程。

  實際中,若是Executors提供的三個靜態方法能知足要求,就儘可能使用它提供的三個方法,由於本身去手動配置ThreadPoolExecutor的參數有點麻煩,要根據實際任務的類型和數量來進行配置。

  另外,若是ThreadPoolExecutor達不到要求,能夠本身繼承ThreadPoolExecutor類進行重寫。

相關文章
相關標籤/搜索