前言:多線程併發在咱們的業務場景很常見,可是爲了解決多線程的併發問題並不容易,即使你是中高級程序員亦或是架構師,你都不必定可以吃透並解決多線程併發場景所暴露的問題,今天拋磚引玉來給你們介紹下多程序併發的五種同步輔助類,但願在線程併發上能爲你們之後提供一下思路以及解決問題。html
1.CountDownLatch java
CountDownLatch 是一種很是簡單、但很經常使用的同步輔助類。其做用是在完成一組正在其餘線程中執行的操做以前,容許一個或多個線程一直阻塞。用給定的計數初始化CountDownLatch。因爲調用了countDown()方法,因此在當前計數到達零以前,await方法會一直受阻塞。以後,會釋放全部等待的線程,await的全部後續調用都將當即返回程序員
示例代碼編程
//建立時,就須要指定參與的parties個數 數組
int parties = 12; 安全
CountDownLatch latch = new CountDownLatch(parties); 多線程
//線程池中同步task 架構
ExecutorService executor = Executors.newFixedThreadPool(parties); 併發
for(int i = 0; i < parties; i++) { dom
executor.execute(new Runnable() {
@Override
public void run() {
try {
//能夠在任務執行開始時執行,表示全部的任務都啓動後,主線程的await便可解除
//latch.countDown();
//run
//..
Thread.sleep(3000);
} catch (Exception e) {
}
finally {
//任務執行完畢後:到達
//表示全部的任務都結束,主線程才能繼續
latch.countDown();
}
}
});
}
latch.await();//主線程阻塞,直到全部的parties到達
//latch上全部的parties都達到後,再次執行await將不會有效,
//即barrier是不可重用的
executor.shutdown();
2.CyclicBarrier
CyclicBarrier 一種可重置的多路同步點,在某些併發編程場景頗有用。它容許一組線程互相等待,直到到達某個公共的屏障點 (common barrier point)。在涉及一組固定大小的線程的程序中,這些線程必須不時地互相等待,此時 CyclicBarrier 頗有用。由於該 barrier在釋放等待線程後能夠重用,因此稱它爲循環的barrier。
下面看看對應的方法。
public CyclicBarrier(int parties, Runnable barrierAction)
建立一個新的CycleBarrier,它將在給定數量的參與者(線程)處於等待狀態時候啓動,並在啓動barrier時執行給定的屏障操做,該操做由最後一個今日的屏障的線程執行。參數barrierAction是在啓動屏障的時候執行命令,若是不執行任何操做則該參數是null。
public int await()
在全部參與者都已經在此barrier上調用 await方法以前,將一直等待。若是當前線程部署將要到達的最後一個線程將禁用它。
reset
將屏障重置爲其初始化狀態,若是全部參與者目前都在屏障處等待,則它們將返回,並且會拋出一個異常。
getNumberWaiting
返回當前在屏障處等待的參與者數目。
3.Semaphore(信號量)
信號量是一類經典的同步工具。信號量一般用來限制線程能夠同時訪問的(物理或邏輯)資源數量。
咱們以一個停車場運做爲例來講明信號量的做用。假設停車場只有三個車位,一開始三個車位都是空的。這時若是同時來了三輛車,看門人容許其中它們進入進入,而後放下車攔。之後來的車必須在入口等待,直到停車場中有車輛離開。這時,若是有一輛車離開停車場,看門人得知後,打開車攔,放入一輛,若是又離開一輛,則又能夠放入一輛,如此往復。
在這個停車場系統中,車位是公共資源,每輛車比如一個線程,看門人起的就是信號量的做用。信號量是一個非負整數,表示了當前公共資源的可用數目(在上面的例子中能夠用空閒的停車位類比信號量),當一個線程要使用公共資源時(在上面的例子中能夠用車輛類比線程),首先要查看信號量,若是信號量的值大於1,則將其減1,而後去佔有公共資源。若是信號量的值爲0,則線程會將本身阻塞,直到有其它線程釋放公共資源。
在信號量上咱們定義兩種操做: acquire(獲取) 和 release(釋放)。當一個線程調用acquire操做時,它要麼經過成功獲取信號量(信號量減1),要麼一直等下去,直到有線程釋放信號量,或超時。release(釋放)實際上會將信號量的值加1,而後喚醒等待的線程。
信號量主要用於兩個目的,一個是用於多個共享資源的互斥使用,另外一個用於併發線程數的控制。
使用示例
papublic class SemaphoreDemo {
private Semaphore smp = new Semaphore(3); private Random rnd = new Random(); class TaskDemo implements Runnable{
private String id; TaskDemo(String id){
this.id = id; }
public void run(){
try { smp.acquire(); System.out.println("Thread " + id + " is working"); Thread.sleep(rnd.nextInt(1000)); smp.release(); System.out.println("Thread " + id + " is over"); } catch (InterruptedException e) { } } } public static void main(String[] args){ SemaphoreDemo semaphoreDemo = new SemaphoreDemo(); //注意我建立的線程池類型, ExecutorService se = Executors.newCachedThreadPool(); se.submit(semaphoreDemo.new TaskDemo("a")); se.submit(semaphoreDemo.new TaskDemo("b")); se.submit(semaphoreDemo.new TaskDemo("c")); se.submit(semaphoreDemo.new TaskDemo("d")); se.submit(semaphoreDemo.new TaskDemo("e")); se.submit(semaphoreDemo.new TaskDemo("f")); se.shutdown(); }}
運行結果
Thread c is working
Thread b is working
Thread a is working
Thread c is over
Thread d is working
Thread b is over
Thread e is working
Thread a is over
Thread f is working
Thread d is over
Thread e is over
Thread f is over
能夠看出,最多同時有三個線程併發執行,也能夠認爲有三個公共資源(好比計算機的三個串口)。
參考內容:http://www.cnblogs.com/nullzx/p/5270233.html
4.Phaser
在JAVA 1.7引入了一個新的併發API:Phaser,一個可重用的同步barrier。在此前,JAVA已經有CyclicBarrier、CountDownLatch這兩種同步barrier,可是Phaser更加靈活,並且側重於「重用」。
API簡述
一、Phaser():構造函數,建立一個Phaser;默認parties個數爲0。此後咱們能夠經過register()、bulkRegister()方法來註冊新的parties。每一個Phaser實例內部,都持有幾個狀態數據:termination狀態、已經註冊的parties個數(registeredParties)、當前phase下已到達的parties個數(arrivedParties)、當前phase週期數,還有2個同步阻塞隊列Queue。Queue中保存了全部的waiter,即由於advance而等待的線程信息;這兩個Queue分別爲evenQ和oddQ,這兩個Queue在實現上沒有任何區別,Queue的元素爲QNode,每一個QNode保存一個waiter的信息,好比Thread引用、阻塞的phase、超時的deadline、是否支持interrupted響應等。兩個Queue,其中一個保存當前phase中正在使用的waiter,另外一個備用,當phase爲奇數時使用evenQ、oddQ備用,偶數時相反,即兩個Queue輪換使用。當advance事件觸發期間,新register的parties將會被放在備用的Queue中,advance只須要響應另外一個Queue中的waiters便可,避免出現混亂。
二、Phaser(int parties):構造函數,初始必定數量的parties;至關於直接regsiter此數量的parties。
三、arrive():到達,阻塞,等到當前phase下其餘parties到達。若是沒有register(即已register數量爲0),調用此方法將會拋出異常,此方法返回當前phase週期數,若是Phaser已經終止,則返回負數。
四、arriveAndDeregister():到達,並註銷一個parties數量,非阻塞方法。註銷,將會致使Phaser內部的parties個數減一(隻影響當前phase),即下一個phase須要等待arrive的parties數量將減一。異常機制和返回值,與arrive方法一致。
五、arriveAndAwaitAdvance():到達,且阻塞直到其餘parties都到達,且advance。此方法等同於awaitAdvance(arrive())。若是你但願阻塞機制支持timeout、interrupted響應,可使用相似的其餘方法(參見下文)。若是你但願到達後且註銷,並且阻塞等到當前phase下其餘的parties到達,可使用awaitAdvance(arriveAndDeregister())方法組合。此方法的異常機制和返回值同arrive()。
六、awaitAdvance(int phase):阻塞方法,等待phase週期數下其餘全部的parties都到達。若是指定的phase與Phaser當前的phase不一致,則當即返回。
七、awaitAdvanceInterruptibly(int phase):阻塞方法,同awaitAdvance,只是支持interrupted響應,即waiter線程若是被外部中斷,則此方法當即返回,並拋出InterrutedException。
八、awaitAdvanceInterruptibly(int phase,long timeout,TimeUnit unit):阻塞方法,同awaitAdvance,支持timeout類型的interrupted響應,即當前線程阻塞等待約定的時長,超時後以TimeoutException異常方式返回。
九、forceTermination():強制終止,此後Phaser對象將不可用,即register等將再也不有效。此方法將會致使Queue中全部的waiter線程被喚醒。
十、register():新註冊一個party,致使Phaser內部registerPaties數量加1;若是此時onAdvance方法正在執行,此方法將會等待它執行完畢後纔會返回。此方法返回當前的phase週期數,若是Phaser已經中斷,將會返回負數。
十一、bulkRegister(int parties):批量註冊多個parties數組,規則同十、。
十二、getArrivedParties():獲取已經到達的parties個數。
1三、getPhase():獲取當前phase週期數。若是Phaser已經中斷,則返回負值。
1四、getRegisteredParties():獲取已經註冊的parties個數。
1五、getUnarrivedParties():獲取還沒有到達的parties個數。
代碼示例:
//建立時,就須要指定參與的parties個數
int parties = 12;
//能夠在建立時不指定parties
// 而是在運行時,隨時註冊和註銷新的parties
Phaser phaser = new Phaser();
//主線程先註冊一個
//對應下文中,主線程能夠等待全部的parties到達後再解除阻塞(相似與CountDownLatch)
phaser.register();
ExecutorService executor = Executors.newFixedThreadPool(parties);
for(int i = 0; i < parties; i++) {
phaser.register();//每建立一個task,咱們就註冊一個party
executor.execute(new Runnable() {
@Override
public void run() {
try {
int i = 0;
while (i < 3 && !phaser.isTerminated()) {
System.out.println("Generation:" + phaser.getPhase());
Thread.sleep(3000);
//等待同一週期內,其餘Task到達
//而後進入新的週期,並繼續同步進行
phaser.arriveAndAwaitAdvance();
i++;//咱們假定,運行三個週期便可
}
} catch (Exception e) {
}
finally {
phaser.arriveAndDeregister();
}
}
});
}
//主線程到達,且註銷本身
//此後線程池中的線程便可開始按照週期,同步執行。
phaser.arriveAndDeregister();
參考內容:http://shift-alt-ctrl.iteye.com/blog/2302923
5.Exchanger
類java.util.concurrent.Exchanger提供了一個同步點,在這個同步點,一對線程能夠交換數據。每一個線程經過exchange()方法的入口提供數據給他的夥伴線程,並接收他的夥伴線程提供的數據,並返回。
當在運行不對稱的活動時頗有用。好比說,一個線程向buffer中填充數據,另外一個線程從buffer中消費數據;這些線程能夠用Exchange來交換數據。這個交換對於兩個線程來講都是安全的。
package com.clzhang.sample.thread;import java.util.*;import java.util.concurrent.Exchanger;public class SyncExchanger {
private static final Exchanger exchanger = new Exchanger();
class DataProducer implements Runnable {
private List list = new ArrayList(); public void run() {
for (int i = 0; i < 5; i++) { System.out.println("生產了一個數據,耗時1秒"); list.add(new Date()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } try { list = (List) exchanger.exchange(list); } catch (InterruptedException e) { e.printStackTrace(); }
for (Iterator iterator = list.iterator(); iterator.hasNext();) { System.out.println("Producer " + iterator.next()); } } }
class DataConsumer implements Runnable {
private List list = new ArrayList();
public void run() {
for (int i = 0; i < 5; i++) { list.add("這是一個收條。"); }
try { list = (List) exchanger.exchange(list); } catch (InterruptedException e) { e.printStackTrace(); }
for (Iterator iterator = list.iterator(); iterator.hasNext();) { Date d = (Date) iterator.next(); System.out.println("Consumer: " + d); } } }
public static void main(String[] args) { SyncExchanger ins = new SyncExchanger();
new Thread(ins.new DataProducer()).start();
new Thread(ins.new DataConsumer()).start(); } }
輸出
生產了一個數據,耗時1秒
生產了一個數據,耗時1秒
生產了一個數據,耗時1秒
生產了一個數據,耗時1秒
生產了一個數據,耗時1秒
Producer 這是一個收條。
Producer 這是一個收條。
Producer 這是一個收條。
Producer 這是一個收條。
Producer 這是一個收條。
Consumer: Thu Sep 12 17:21:39 CST 2013
Consumer: Thu Sep 12 17:21:40 CST 2013
Consumer: Thu Sep 12 17:21:41 CST 2013
Consumer: Thu Sep 12 17:21:42 CST 2013
Consumer: Thu Sep 12 17:21:43 CST 2013
https://mp.weixin.qq.com/s/JvORQc7fAS7AVIRsO8SdHA