咱們都知道,在JDK1.5以前,Java中要進行業務併發時,一般須要有程序員獨立完成代碼實現,固然也有一些開源的框架提供了這些功能,可是這些依然沒有JDK自帶的功能使用起來方便。而當針對高質量Java多線程併發程序設計時,爲防止死蹦等現象的出現,好比使用java以前的wait()、notify()和synchronized等,往往須要考慮性能、死鎖、公平性、資源管理以及如何避免線程安全性方面帶來的危害等諸多因素,每每會採用一些較爲複雜的安全策略,加劇了程序員的開發負擔.萬幸的是,在JDK1.5出現以後,Sun大神(Doug Lea)終於爲咱們這些可憐的小程序員推出了java.util.concurrent工具包以簡化併發完成。開發者們藉助於此,將有效的減小競爭條件(race conditions)和死鎖線程。concurrent包很好的解決了這些問題,爲咱們提供了更實用的併發程序模型。
Executor :具體Runnable任務的執行者。
ExecutorService :一個線程池管理者,其實現類有多種,我會介紹一部分。咱們能把Runnable,Callable提交到池中讓其調度。
Semaphore :一個計數信號量
ReentrantLock :一個可重入的互斥鎖定 Lock,功能相似synchronized,但要強大的多。
Future :是與Runnable,Callable進行交互的接口,好比一個線程執行結束後取返回的結果等等,還提供了cancel終止線程。
BlockingQueue :阻塞隊列。
CompletionService : ExecutorService的擴展,能夠得到線程執行結果的
CountDownLatch :一個同步輔助類,在完成一組正在其餘線程中執行的操做以前,它容許一個或多個線程一直等待。
CyclicBarrier :一個同步輔助類,它容許一組線程互相等待,直到到達某個公共屏障點
Future :Future 表示異步計算的結果。
ScheduledExecutorService :一個 ExecutorService,可安排在給定的延遲後運行或按期執行的命令。
接下來逐一介紹
Executors主要方法說明
newFixedThreadPool(固定大小線程池)
建立一個可重用固定線程集合的線程池,以共享的無界隊列方式來運行這些線程(只有要請求的過來,就會在一個隊列裏等待執行)。若是在關閉前的執行期間因爲失敗而致使任何線程終止,那麼一個新線程將代替它執行後續的任務(若是須要)。
newCachedThreadPool(無界線程池,能夠進行自動線程回收)
建立一個可根據須要建立新線程的線程池,可是在之前構造的線程可用時將重用它們。對於執行不少短時間異步任務的程序而言,這些線程池一般可提升程序性能。調用 execute 將重用之前構造的線程(若是線程可用)。若是現有線程沒有可用的,則建立一個新線程並添加到池中。終止並從緩存中移除那些已有 60 秒鐘未被使用的線程。所以,長時間保持空閒的線程池不會使用任何資源。注意,可使用 ThreadPoolExecutor 構造方法建立具備相似屬性但細節不一樣(例如超時參數)的線程池。
newSingleThreadExecutor(單個後臺線程)
建立一個使用單個 worker 線程的 Executor,以無界隊列方式來運行該線程。(注意,若是由於在關閉前的執行期間出現失敗而終止了此單個線程,那麼若是須要,一個新線程將代替它執行後續的任務)。可保證順序地執行各個任務,而且在任意給定的時間不會有多個線程是活動的。與其餘等效的 newFixedThreadPool(1) 不一樣,可保證無需從新配置此方法所返回的執行程序便可使用其餘的線程。
這些方法返回的都是ExecutorService對象,這個對象能夠理解爲就是一個線程池。
這個線程池的功能仍是比較完善的。能夠提交任務submit()能夠結束線程池shutdown()。
01 |
import java.util.concurrent.ExecutorService; |
02 |
import java.util.concurrent.Executors; |
03 |
public class MyExecutor extends Thread { |
05 |
public MyExecutor( int i){ |
10 |
System.out.println( "[" + this .index+ "] start...." ); |
11 |
Thread.sleep(( int )(Math.random()* 1000 )); |
12 |
System.out.println( "[" + this .index+ "] end." ); |
18 |
public static void main(String args[]){ |
19 |
ExecutorService service=Executors.newFixedThreadPool( 4 ); |
20 |
for ( int i= 0 ;i< 10 ;i++){ |
21 |
service.execute( new MyExecutor(i)); |
24 |
System.out.println( "submit finish" ); |
雖然打印了一些信息,可是看的不是很是清晰,這個線程池是如何工做的,咱們來將休眠的時間調長10倍。
Thread.sleep((int)(Math.random()*10000));
再來看,會清楚看到只能執行4個線程。當執行完一個線程後,纔會又執行一個新的線程,也就是說,咱們將全部的線程提交後,線程池會等待執行完最後shutdown。咱們也會發現,提交的線程被放到一個「無界隊列裏」。這是一個有序隊列(BlockingQueue,這個下面會說到)。
另外它使用了Executors的靜態函數生成一個固定的線程池,顧名思義,線程池的線程是不會釋放的,即便它是Idle。
這就會產生性能問題,好比若是線程池的大小爲200,當所有使用完畢後,全部的線程會繼續留在池中,相應的內存和線程切換(while(true)+sleep循環)都會增長。
若是要避免這個問題,就必須直接使用ThreadPoolExecutor()來構造。能夠像通用的線程池同樣設置「最大線程數」、「最小線程數」和「空閒線程keepAlive的時間」。
這個就是線程池基本用法。
Semaphore
一個計數信號量。從概念上講,信號量維護了一個許可集合。若有必要,在許可可用前會阻塞每個 acquire(),而後再獲取該許可。每一個 release() 添加一個許可,從而可能釋放一個正在阻塞的獲取者。可是,不使用實際的許可對象,Semaphore 只對可用許可的號碼進行計數,並採起相應的行動。
Semaphore 一般用於限制能夠訪問某些資源(物理或邏輯的)的線程數目。例如,下面的類使用信號量控制對內容池的訪問:
這裏是一個實際的狀況,你們排隊上廁所,廁所只有兩個位置,來了10我的須要排隊。
01 |
import java.util.concurrent.ExecutorService; |
02 |
import java.util.concurrent.Executors; |
03 |
import java.util.concurrent.Semaphore; |
04 |
public class MySemaphore extends Thread { |
07 |
public MySemaphore( int i,Semaphore s){ |
13 |
if (position.availablePermits()> 0 ){ |
14 |
System.out.println( "顧客[" + this .id+ "]進入廁所,有空位" ); |
17 |
System.out.println( "顧客[" + this .id+ "]進入廁所,沒空位,排隊" ); |
20 |
System.out.println( "顧客[" + this .id+ "]得到坑位" ); |
21 |
Thread.sleep(( int )(Math.random()* 1000 )); |
22 |
System.out.println( "顧客[" + this .id+ "]使用完畢" ); |
29 |
public static void main(String args[]){ |
30 |
ExecutorService list=Executors.newCachedThreadPool(); |
31 |
Semaphore position= new Semaphore( 2 ); |
32 |
for ( int i= 0 ;i< 10 ;i++){ |
33 |
list.submit( new MySemaphore(i+ 1 ,position)); |
36 |
position.acquireUninterruptibly( 2 ); |
37 |
System.out.println( "使用完畢,須要清掃了" ); |
ReentrantLock
一個可重入的互斥鎖定 Lock,它具備與使用 synchronized 方法和語句所訪問的隱式監視器鎖定相同的一些基本行爲和語義,但功能更強大。
ReentrantLock 將由最近成功得到鎖定,而且尚未釋放該鎖定的線程所擁有。當鎖定沒有被另外一個線程所擁有時,調用 lock 的線程將成功獲取該鎖定並返回。若是當前線程已經擁有該鎖定,此方法將當即返回。可使用 isHeldByCurrentThread() 和 getHoldCount() 方法來檢查此狀況是否發生。
此類的構造方法接受一個可選的公平參數。
當設置爲 true時,在多個線程的爭用下,這些鎖定傾向於將訪問權授予等待時間最長的線程。不然此鎖定將沒法保證任何特定訪問順序。
與採用默認設置(使用不公平鎖定)相比,使用公平鎖定的程序在許多線程訪問時表現爲很低的整體吞吐量(即速度很慢,經常極其慢),可是在得到鎖定和保證鎖定分配的均衡性時差別較小。不過要注意的是,公平鎖定不能保證線程調度的公平性。所以,使用公平鎖定的衆多線程中的一員可能得到多倍的成功機會,這種狀況發生在其餘活動線程沒有被處理而且目前並未持有鎖定時。還要注意的是,未定時的 tryLock 方法並無使用公平設置。由於即便其餘線程正在等待,只要該鎖定是可用的,此方法就能夠得到成功。
建議老是 當即實踐,使用 try 塊來調用 lock,在以前/以後的構造中,最典型的代碼以下:
02 |
private final ReentrantLock lock = new ReentrantLock(); |
個人例子:
01 |
import java.util.concurrent.ExecutorService; |
02 |
import java.util.concurrent.Executors; |
03 |
import java.util.concurrent.locks.ReentrantLock; |
04 |
public class MyReentrantLock extends Thread{ |
05 |
TestReentrantLock lock; |
07 |
public MyReentrantLock( int i,TestReentrantLock test){ |
14 |
public static void main(String args[]){ |
15 |
ExecutorService service=Executors.newCachedThreadPool(); |
16 |
TestReentrantLock lock= new TestReentrantLock(); |
17 |
for ( int i= 0 ;i< 10 ;i++){ |
18 |
service.submit( new MyReentrantLock(i,lock)); |
23 |
class TestReentrantLock{ |
24 |
private ReentrantLock lock= new ReentrantLock(); |
25 |
public void print( int str){ |
28 |
System.out.println(str+ "得到" ); |
29 |
Thread.sleep(( int )(Math.random()* 1000 )); |
35 |
System.out.println(str+ "釋放" ); |
BlockingQueue
支持兩個附加操做的 Queue,這兩個操做是:檢索元素時等待隊列變爲非空,以及存儲元素時等待空間變得可用。
BlockingQueue 不接受 null 元素。試圖 add、put 或 offer 一個 null 元素時,某些實現會拋出 NullPointerException。null 被用做指示 poll 操做失敗的警惕值。
BlockingQueue 能夠是限定容量的。它在任意給定時間均可以有一個 remainingCapacity,超出此容量,便沒法無阻塞地 put 額外的元素。
沒有任何內部容量約束的 BlockingQueue 老是報告 Integer.MAX_VALUE 的剩餘容量。
BlockingQueue 實現主要用於生產者-使用者隊列,但它另外還支持 Collection 接口。所以,舉例來講,使用 remove(x) 從隊列中移除任意一個元素是有可能的。
然而,這種操做一般不 會有效執行,只能有計劃地偶爾使用,好比在取消排隊信息時。
BlockingQueue 實現是線程安全的。全部排隊方法均可以使用內部鎖定或其餘形式的併發控制來自動達到它們的目的。
然而,大量的 Collection 操做(addAll、containsAll、retainAll 和 removeAll)沒有 必要自動執行,除非在實現中特別說明。
所以,舉例來講,在只添加了 c 中的一些元素後,addAll(c) 有可能失敗(拋出一個異常)。
BlockingQueue 實質上不 支持使用任何一種「close」或「shutdown」操做來指示再也不添加任何項。
這種功能的需求和使用有依賴於實現的傾向。例如,一種經常使用的策略是:對於生產者,插入特殊的 end-of-stream 或 poison 對象,並根據使用者獲取這些對象的時間來對它們進行解釋。
下面的例子演示了這個阻塞隊列的基本功能。
01 |
import java.util.concurrent.BlockingQueue; |
02 |
import java.util.concurrent.ExecutorService; |
03 |
import java.util.concurrent.Executors; |
04 |
import java.util.concurrent.LinkedBlockingQueue; |
05 |
public class MyBlockingQueue extends Thread { |
06 |
public static BlockingQueue<String> queue = new LinkedBlockingQueue<String>( 3 ); |
08 |
public MyBlockingQueue( int i) { |
13 |
queue.put(String.valueOf( this .index)); |
14 |
System.out.println( "{" + this .index + "} in queue!" ); |
15 |
} catch (Exception e) { |
19 |
public static void main(String args[]) { |
20 |
ExecutorService service = Executors.newCachedThreadPool(); |
21 |
for ( int i = 0 ; i < 10 ; i++) { |
22 |
service.submit( new MyBlockingQueue(i)); |
24 |
Thread thread = new Thread() { |
28 |
Thread.sleep(( int ) (Math.random() * 1000 )); |
29 |
if (MyBlockingQueue.queue.isEmpty()) |
31 |
String str = MyBlockingQueue.queue.take(); |
32 |
System.out.println(str + " has take!" ); |
34 |
} catch (Exception e) { |
39 |
service.submit(thread); |
---------------------執行結果-----------------
{0} in queue!
{1} in queue!
{2} in queue!
{3} in queue!
0 has take!
{4} in queue!
1 has take!
{6} in queue!
2 has take!
{7} in queue!
3 has take!
{8} in queue!
4 has take!
{5} in queue!
6 has take!
{9} in queue!
7 has take!
8 has take!
5 has take!
9 has take!
-----------------------------------------
CompletionService
將生產新的異步任務與使用已完成任務的結果分離開來的服務。生產者 submit 執行的任務。使用者 take 已完成的任務,
並按照完成這些任務的順序處理它們的結果。例如,CompletionService 能夠用來管理異步 IO ,執行讀操做的任務做爲程序或系統的一部分提交,
而後,當完成讀操做時,會在程序的不一樣部分執行其餘操做,執行操做的順序可能與所請求的順序不一樣。
一般,CompletionService 依賴於一個單獨的 Executor 來實際執行任務,在這種狀況下,
CompletionService 只管理一個內部完成隊列。ExecutorCompletionService 類提供了此方法的一個實現。
01 |
import java.util.concurrent.Callable; |
02 |
import java.util.concurrent.CompletionService; |
03 |
import java.util.concurrent.ExecutorCompletionService; |
04 |
import java.util.concurrent.ExecutorService; |
05 |
import java.util.concurrent.Executors; |
06 |
public class MyCompletionService implements Callable<String> { |
09 |
public MyCompletionService( int i){ |
12 |
public static void main(String[] args) throws Exception{ |
13 |
ExecutorService service=Executors.newCachedThreadPool(); |
14 |
CompletionService<String> completion= new ExecutorCompletionService<String>(service); |
15 |
for ( int i= 0 ;i< 10 ;i++){ |
16 |
completion.submit( new MyCompletionService(i)); |
18 |
for ( int i= 0 ;i< 10 ;i++){ |
19 |
System.out.println(completion.take().get()); |
23 |
public String call() throws Exception { |
24 |
Integer time=( int )(Math.random()* 1000 ); |
26 |
System.out.println( this .id+ " start" ); |
28 |
System.out.println( this .id+ " end" ); |
33 |
return this .id+ ":" +time; |
CountDownLatch
一個同步輔助類,在完成一組正在其餘線程中執行的操做以前,它容許一個或多個線程一直等待。
用給定的計數 初始化 CountDownLatch。因爲調用了 countDown() 方法,因此在當前計數到達零以前,await 方法會一直受阻塞。
以後,會釋放全部等待的線程,await 的全部後續調用都將當即返回。這種現象只出現一次——計數沒法被重置。若是須要重置計數,請考慮使用 CyclicBarrier。
CountDownLatch 是一個通用同步工具,它有不少用途。將計數 1 初始化的 CountDownLatch 用做一個簡單的開/關鎖存器,
或入口:在經過調用 countDown() 的線程打開入口前,全部調用 await 的線程都一直在入口處等待。
用 N 初始化的 CountDownLatch 可使一個線程在 N 個線程完成某項操做以前一直等待,或者使其在某項操做完成 N 次以前一直等待。
CountDownLatch 的一個有用特性是,它不要求調用 countDown 方法的線程等到計數到達零時才繼續,
而在全部線程都能經過以前,它只是阻止任何線程繼續經過一個 await。
一下的例子是別人寫的,很是形象。
01 |
import java.util.concurrent.CountDownLatch; |
02 |
import java.util.concurrent.ExecutorService; |
03 |
import java.util.concurrent.Executors; |
04 |
public class TestCountDownLatch { |
05 |
public static void main(String[] args) throws InterruptedException { |
07 |
final CountDownLatch begin = new CountDownLatch( 1 ); |
09 |
final CountDownLatch end = new CountDownLatch( 10 ); |
11 |
final ExecutorService exec = Executors.newFixedThreadPool( 10 ); |
13 |
for ( int index = 0 ; index < 10 ; index++) { |
14 |
final int NO = index + 1 ; |
15 |
Runnable run = new Runnable() { |
19 |
Thread.sleep(( long ) (Math.random() * 10000 )); |
20 |
System.out.println( "No." + NO + " arrived" ); |
21 |
} catch (InterruptedException e) { |
29 |
System.out.println( "Game Start" ); |
32 |
System.out.println( "Game Over" ); |
CountDownLatch最重要的方法是countDown()和await(),前者主要是倒數一次,後者是等待倒數到0,若是沒有到達0,就只有阻塞等待了。
CyclicBarrier
一個同步輔助類,它容許一組線程互相等待,直到到達某個公共屏障點 (common barrier point)。
在涉及一組固定大小的線程的程序中,這些線程必須不時地互相等待,此時 CyclicBarrier 頗有用。由於該 barrier 在釋放等待線程後能夠重用,因此稱它爲循環 的 barrier。
CyclicBarrier 支持一個可選的 Runnable 命令,在一組線程中的最後一個線程到達以後(但在釋放全部線程以前),
該命令只在每一個屏障點運行一次。若在繼續全部參與線程以前更新共享狀態,此屏障操做 頗有用。
示例用法:下面是一個在並行分解設計中使用 barrier 的例子,很經典的旅行團例子:
01 |
import java.text.SimpleDateFormat; |
02 |
import java.util.Date; |
03 |
import java.util.concurrent.BrokenBarrierException; |
04 |
import java.util.concurrent.CyclicBarrier; |
05 |
import java.util.concurrent.ExecutorService; |
06 |
import java.util.concurrent.Executors; |
07 |
public class TestCyclicBarrier { |
09 |
private static int [] timeWalk = { 5 , 8 , 15 , 15 , 10 }; |
11 |
private static int [] timeSelf = { 1 , 3 , 4 , 4 , 5 }; |
13 |
private static int [] timeBus = { 2 , 4 , 6 , 6 , 7 }; |
16 |
SimpleDateFormat sdf = new SimpleDateFormat( "HH:mm:ss" ); |
17 |
return sdf.format( new Date()) + ": " ; |
19 |
static class Tour implements Runnable { |
21 |
private CyclicBarrier barrier; |
22 |
private String tourName; |
23 |
public Tour(CyclicBarrier barrier, String tourName, int [] times) { |
25 |
this .tourName = tourName; |
26 |
this .barrier = barrier; |
30 |
Thread.sleep(times[ 0 ] * 1000 ); |
31 |
System.out.println(now() + tourName + " Reached Shenzhen" ); |
33 |
Thread.sleep(times[ 1 ] * 1000 ); |
34 |
System.out.println(now() + tourName + " Reached Guangzhou" ); |
36 |
Thread.sleep(times[ 2 ] * 1000 ); |
37 |
System.out.println(now() + tourName + " Reached Shaoguan" ); |
39 |
Thread.sleep(times[ 3 ] * 1000 ); |
40 |
System.out.println(now() + tourName + " Reached Changsha" ); |
42 |
Thread.sleep(times[ 4 ] * 1000 ); |
43 |
System.out.println(now() + tourName + " Reached Wuhan" ); |
45 |
} catch (InterruptedException e) { |
46 |
} catch (BrokenBarrierException e) { |
50 |
public static void main(String[] args) { |
52 |
CyclicBarrier barrier = new CyclicBarrier( 3 ); |
53 |
ExecutorService exec = Executors.newFixedThreadPool( 3 ); |
54 |
exec.submit( new Tour(barrier, "WalkTour" , timeWalk)); |
55 |
exec.submit( new Tour(barrier, "SelfTour" , timeSelf)); |
57 |
exec.submit( new Tour(barrier, "BusTour" , timeBus)); |
CyclicBarrier最重要的屬性就是參與者個數,另外最要方法是await()。當全部線程都調用了await()後,就表示這些線程均可以繼續執行,不然就會等待。
Future
Future 表示異步計算的結果。它提供了檢查計算是否完成的方法,以等待計算的完成,並檢索計算的結果。
計算完成後只能使用 get 方法來檢索結果,若有必要,計算完成前能夠阻塞此方法。取消則由 cancel 方法來執行。
還提供了其餘方法,以肯定任務是正常完成仍是被取消了。一旦計算完成,就不能再取消計算。
若是爲了可取消性而使用 Future但又不提供可用的結果,則能夠聲明 Future<?> 形式類型、並返回 null 做爲基礎任務的結果。
這個咱們在前面CompletionService已經看到了,這個Future的功能,並且這個能夠在提交線程的時候被指定爲一個返回對象的。
ScheduledExecutorService
一個 ExecutorService,可安排在給定的延遲後運行或按期執行的命令。
schedule 方法使用各類延遲建立任務,並返回一個可用於取消或檢查執行的任務對象。scheduleAtFixedRate 和 scheduleWithFixedDelay 方法建立並執行某些在取消前一直按期運行的任務。
用 Executor.execute(java.lang.Runnable) 和 ExecutorService 的 submit 方法所提交的命令,經過所請求的 0 延遲進行安排。
schedule 方法中容許出現 0 和負數延遲(但不是週期),並將這些視爲一種當即執行的請求。
全部的 schedule 方法都接受相對 延遲和週期做爲參數,而不是絕對的時間或日期。將以 Date 所表示的絕對時間轉換成要求的形式很容易。
例如,要安排在某個之後的日期運行,可使用:schedule(task, date.getTime() - System.currentTimeMillis(), TimeUnit.MILLISECONDS)。
可是要注意,因爲網絡時間同步協議、時鐘漂移或其餘因素的存在,所以相對延遲的期滿日期沒必要與啓用任務的當前 Date 相符。
Executors 類爲此包中所提供的 ScheduledExecutorService 實現提供了便捷的工廠方法。
一下的例子也是網上比較流行的。
01 |
import static java.util.concurrent.TimeUnit.SECONDS; |
02 |
import java.util.Date; |
03 |
import java.util.concurrent.Executors; |
04 |
import java.util.concurrent.ScheduledExecutorService; |
05 |
import java.util.concurrent.ScheduledFuture; |
06 |
public class TestScheduledThread { |
07 |
public static void main(String[] args) { |
08 |
final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool( 2 ); |
09 |
final Runnable beeper = new Runnable() { |
12 |
System.out.println( new Date() + " beep " + (++count)); |
16 |
final ScheduledFuture beeperHandle = scheduler.scheduleAtFixedRate(beeper, 1 , 2 , SECONDS); |
18 |
final ScheduledFuture beeperHandle2 = scheduler.scheduleWithFixedDelay(beeper, 2 , 5 , SECONDS); |
20 |
scheduler.schedule( new Runnable() { |
22 |
beeperHandle.cancel( true ); |
23 |
beeperHandle2.cancel( true ); |
這樣咱們就把concurrent包下比較重要的功能都已經總結完了,但願對咱們理解能有幫助。html