JDK中爲了處理線程之間的同步問題,除了提供鎖機制以外,還提供了幾個很是有用的併發工具類:CountDownLatch、CyclicBarrier、Semphore、Exchanger、Phaser;
CountDownLatch、CyclicBarrier、Semphore、Phaser 這四個工具類提供一種併發流程的控制手段;而Exchanger工具類則提供了在線程之間交換數據的一種手段。html
Phaser 是JDK1.7版本中新增的,是一個可重用的同步barrier,它的功能與 CountDownLatch、CyclicBarrier 類似,可是使用起來更加靈活。能夠java
構造方法bash
方法名 | 描述 |
---|---|
Phaser() | 構建一個Phaser |
Phaser(int parties) | 建立一個指定屏障數量的Phaser |
Phaser(Phaser parent) | 至關於 Phaser(parent, 0) |
Phaser(Phaser parent, int parties) | 建立一個指定屏障數量的Phaser,此phaser是註冊在另外一個Phaser parent下 |
方法摘要併發
方法名 | 描述 |
---|---|
public int arrive() | 到達此phaser的屏障點,使phaser的到達的線程數加一,但不會阻塞等待其餘線程。
返回:
phase值,即當前階段(週期)的索引,或者是負值(當Phaser 中止時) |
public int arriveAndDeregister() | 到達此phaser的屏障點,使phaser的到達的線程數加一,而且會取消一個屏障點的註冊。也不會阻塞等待其餘線程。
返回:
phase值,即當前階段(週期)的索引,或者是負值(當Phaser 中止時) |
public int arriveAndAwaitAdvance() | 到達此phaser的屏障點,而且阻塞等待其餘線程到達此屏障點。注意:這是
非中斷的阻塞
,此方法與awaitAdvance(arrive())等同。若是你但願阻塞機制支持timeout、interrupted響應,可使用相似的其餘方法(參見下文)。若是你但願到達後且註銷,並且阻塞等到當前phase下其餘的parties到達,可使用awaitAdvance(arriveAndDeregister())方法組合。
返回:
phase值,即當前階段(週期)的索引;若是Phaser 中止,則返回負值 |
public int awaitAdvance(int phase) | 在指定的階段(週期)phase下等待其餘線程到達屏障點,注意:這是
非中斷的阻塞
。若是指定的phase與Phaser當前的phase不一致,或者Phaser 中止了,則當即返回。
參數 phase:
一般就是arrive()、arriveAndDeregister()的返回值; |
public int awaitAdvanceInterruptibly(int phase) throws InterruptedException |
此方法是可中斷的,其餘與awaitAdvance()一致 |
public int awaitAdvanceInterruptibly( int phase, long timeout,TimeUnit unit) throws InterruptedException, TimeoutException |
超時等待方法,其餘與awaitAdvance()一致 |
public int register() | 新註冊一個party,致使Phaser內部registerPaties數量加1;若是此時onAdvance方法正在執行,此方法將會等待它執行完畢後纔會返回。此方法返回當前的phase週期數,若是Phaser已經中斷,將會返回負數。 |
public int bulkRegister(int parties) | 批量註冊多個party,與register()類似 |
protected boolean onAdvance(int phase, int registeredParties) | barrier action(屏障方法)。若是須要,則必須繼承Phaser類,重寫此方法。若是返回true表示此Phaser應該終止(此後將會把Phaser的狀態爲termination,即isTermination()將返回true。),不然能夠繼續進行。phase參數表示當前週期數,registeredParties表示當前已經註冊的parties個數。 默認實現爲:return registeredParties == 0;在不少狀況下,開發者能夠經過重寫此方法,來實現自定義的 |
public void forceTermination() | 強制終止,此後Phaser對象將不可用,即register等將再也不有效。此方法將會致使Queue中全部的waiter線程被喚醒。這個方法對於在一個或多個任務遇到意外異常以後協調恢復是頗有用的。 |
public int getArrivedParties() | 獲取已經到達的parties個數。 |
public int getUnarrivedParties() | 獲取沒有到達的parties個數。 |
public Phaser getParent() | 獲取其父親類Phaser,沒有則返回null |
public Phaser getRoot() | 返回該phaser的根祖先,若是沒有父類,返回此phaser。 |
public boolean isTerminated() | 若是該phaser被終止,則返回true。 |
例子很簡單,模擬跑步比賽的過程,分爲三個階段:一、參賽者到達起跑點,並在起跑點等待其餘參賽者;二、參賽者齊人後,開始準備,並等待槍聲。三、參賽這到達終點,並結束比賽,再也不等待任何狀況。ide
public class PhaserTest{
public static MyPhaser myPhaser = new MyPhaser();
public static void main(String[] args) {
MyPhaser myPhaser = new MyPhaser();
// 一次性註冊5個party,即創建5個屏障點
myPhaser.bulkRegister(5);
for (int i = 0; i < 5; i++) {
Thread runner = new Thread(new Runnable() {
@Override
public void run() {
// 第一階段(週期),phaser的週期數初始值爲0
System.out.println(Thread.currentThread().getName() + "到達了起跑點!");
// 到達了屏障點(起跑點),阻塞等待其餘線程
myPhaser.arriveAndAwaitAdvance();
// 繼續運行,將進入第二階段,phaser的週期數加一
System.out.println(Thread.currentThread().getName() + "準備起跑!");
// 到達了屏障點(準備起跑),阻塞等待其餘線程
myPhaser.arriveAndAwaitAdvance();
// 進入第三階段
System.out.println(Thread.currentThread().getName() + "到達了終點!");
// 參數者到達了終點,結束比賽,再也不等待其餘參賽者
myPhaser.arriveAndDeregister();// 取消註冊一個party
}
}, "參賽者" + i + "號");
runner.start();
}
}
}
複製代碼
MyPhaser類,定製 barrier action(屏障事件)
函數
public class MyPhaser extends Phaser {
@Override
//改寫onAdvance方法
public boolean onAdvance(int phase, int registeredParties) {
//判斷當前的Phaser是否終止
if (!isTerminated()) {
// 分紅三個階段,在不一樣的階段(週期),執行不一樣的屏障事件
if (phase == 0) {
// ....
System.out.println("第一階段:全部參賽者都到達了起跑點!");
} else if (phase == 1) {
// ....
System.out.println("第二階段:全部參賽者都已經就位,並準備好!比賽正式開始");
} else if (phase == 2) {
// ....
System.out.println("第三階段:全部參賽者都到達終點,比賽結束!!");
}
}
return super.onAdvance(phase, registeredParties);
}
}
複製代碼
運行結果:
工具
參賽者0號到達了起跑點!
參賽者3號到達了起跑點!
參賽者4號到達了起跑點!
參賽者2號到達了起跑點!
參賽者1號到達了起跑點!
第一階段:全部參賽者都到達了起跑點!
參賽者0號準備起跑!
參賽者1號準備起跑!
參賽者2號準備起跑!
參賽者3號準備起跑!
參賽者4號準備起跑!
第二階段:全部參賽者都已經就位,並準備好!比賽正式開始
參賽者4號到達了終點!
參賽者1號到達了終點!
參賽者0號到達了終點!
參賽者2號到達了終點!
參賽者3號到達了終點!
第三階段:全部參賽者都到達終點,比賽結束!
複製代碼
下面的例子:每個Phaser週期類註冊的線程數目不能超過TASKS_PER_PHASER
(例子中是4個),不然就要增長一層子phaser層。ui
public class PhaserTest6 {
//
private static final int = 4;
public static void main(String args[]) throws Exception {
//
final int phaseToTerminate = 3;
//建立一個Phaser父類對象
final Phaser phaser = new Phaser() {
@Override
protected boolean onAdvance(int phase, int registeredParties) { //屏障方法
System.out.println("====== " + phase + " ======");
return phase == phaseToTerminate || registeredParties == 0;
}
};
//建立10個任務
final Task tasks[] = new Task[10];
build(tasks, 0, tasks.length, phaser);
for (int i = 0; i < tasks.length; i++) {
System.out.println("starting thread, id: " + i);
final Thread thread = new Thread(tasks[i]);
thread.start();
}
}
//遞歸分層,
public static void build(Task[] tasks, int lo, int hi, Phaser ph) {
//若是任務的數量超過每一層的phaser的閾值TASKS_PER_PHASER,則要繼續分層
if (hi - lo > TASKS_PER_PHASER) {
for (int i = lo; i < hi; i += TASKS_PER_PHASER) {
int j = Math.min(i + TASKS_PER_PHASER, hi);
//當前的phaser(ph)做爲父週期,來建立一個子phaser
build(tasks, i, j, new Phaser(ph));
}
} else {
//線程的數量在閾值內,無需分紅,能夠直接註冊線程到當前的Phaser
for (int i = lo; i < hi; ++i)
tasks[i] = new Task(i, ph);
}
}
public static class Task implements Runnable {
//
private final int id;
private final Phaser phaser;
public Task(int id, Phaser phaser) {
this.id = id;
this.phaser = phaser;
this.phaser.register();
}
@Override
public void run() {
while (!phaser.isTerminated()) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
// NOP
}
System.out.println("in Task.run(), phase: " + phaser.getPhase() + ", id: " + this.id);
phaser.arriveAndAwaitAdvance();
}
}
}
}
複製代碼
須要注意的是,TASKS_PER_PHASER的值取決於具體的Task實現。對於Task執行時間很短的場景(也就是競爭相對激烈),能夠考慮使用較小的TASKS_PER_PHASER值,例如4。反之能夠適當增大this
運行結果:spa
in Task.run(), phase: 0, id: 2
in Task.run(), phase: 0, id: 1
in Task.run(), phase: 0, id: 3
in Task.run(), phase: 0, id: 0
in Task.run(), phase: 0, id: 8
in Task.run(), phase: 0, id: 5
in Task.run(), phase: 0, id: 9
in Task.run(), phase: 0, id: 7
in Task.run(), phase: 0, id: 4
in Task.run(), phase: 0, id: 6
====== 0 ======
in Task.run(), phase: 1, id: 9
in Task.run(), phase: 1, id: 6
in Task.run(), phase: 1, id: 1
in Task.run(), phase: 1, id: 7
in Task.run(), phase: 1, id: 8
in Task.run(), phase: 1, id: 5
in Task.run(), phase: 1, id: 0
in Task.run(), phase: 1, id: 4
in Task.run(), phase: 1, id: 3
in Task.run(), phase: 1, id: 2
====== 1 ======
in Task.run(), phase: 2, id: 6
in Task.run(), phase: 2, id: 0
in Task.run(), phase: 2, id: 2
in Task.run(), phase: 2, id: 3
in Task.run(), phase: 2, id: 7
in Task.run(), phase: 2, id: 5
in Task.run(), phase: 2, id: 8
in Task.run(), phase: 2, id: 9
in Task.run(), phase: 2, id: 1
in Task.run(), phase: 2, id: 4
====== 2 ======
in Task.run(), phase: 3, id: 3
in Task.run(), phase: 3, id: 4
in Task.run(), phase: 3, id: 9
in Task.run(), phase: 3, id: 5
in Task.run(), phase: 3, id: 8
in Task.run(), phase: 3, id: 1
in Task.run(), phase: 3, id: 7
in Task.run(), phase: 3, id: 0
in Task.run(), phase: 3, id: 2
in Task.run(), phase: 3, id: 6
====== 3 ======
複製代碼
文章源地址:https://www.cnblogs.com/jinggod/p/8494624.html