本篇文章將介紹Semaphore和Exchanger這兩個併發工具類。java
信號量(英語:Semaphore)又稱爲信號標,是一個同步對象,用於保持在0至指定最大值之間的一個計數值。當線程完成一次對該semaphore對象的等待(wait)時,該計數值減一;當線程完成一次對semaphore對象的釋放(release)時,計數值加一。當計數值爲0,則線程等待該semaphore對象再也不能成功直至該semaphore對象變成signaled狀態。semaphore對象的計數值大於0,爲signaled狀態;計數值等於0,爲nonsignaled狀態.git
semaphore對象適用於控制一個僅支持有限個用戶的共享資源,是一種不須要使用忙碌等待(busy waiting)的方法。 ----取自維基百科github
Semaphore思想在分佈式中也有應用,分佈式限流就是典型的案例。如今舉個小例子來使用Semaphorebash
在等公交時,遇到人多的時候常常須要排隊或者擠進去。網絡
利用Semaphore初始化5個許可,每次只能有5個玩家進入,當有玩家退出時,其餘玩家才能進入。併發
先介紹下Semaphore的構造函數和一些方法吧。dom
public Semaphore(int permits);
public Semaphore(int permits, boolean fair);
複製代碼
第一個參數permits表示初始化的許可數量,第二個參數表示是不是公平的。分佈式
使用Semaphore(int permits)構造函數時,默認使用非公平的ide
public void acquire();
public void release();
複製代碼
acquire方法取得許可,release方法表示釋放許可。函數
注:若是屢次調用release方法,會增長許可。例如,初始化許可爲0,這時調用了兩個release方法,Semaphore的許可便會變成2
這兩個是最經常使用的方法,其餘的還有acquire相關的方法tryAcquire和acquireUninterruptibly這裏就不介紹了。
定義一個實現Runnable接口的玩家類
public class Player implements Runnable{
private String playerName;
private Semaphore semaphore;
public Player(String playerName, Semaphore semaphore) {
this.playerName = playerName;
this.semaphore = semaphore;
}
@Override
public void run() {
try {
semaphore.acquire();
System.out.println(playerName+"進入,時間:"+LocalTime.now());
Thread.sleep((long) (3000 * Math.random()));
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(playerName+"退出");
semaphore.release();
}
}
}
複製代碼
經過構造函數Player傳入玩家名稱和Semaphore對象,run方法中先調用acquire方法取得許可,而後睡眠隨機時間,最後在finally中調用release方法釋放許可。
先來使用非公平的看看效果,使用非公平的就比如平時的擠公交,誰先在車門口誰先進。以下圖(來源於網絡)
如今來看看測試代碼
public static void main(String[] args) throws IOException {
Semaphore semaphore = new Semaphore(5);
//Semaphore semaphore = new Semaphore(5,true);
ExecutorService service = Executors.newCachedThreadPool();
//模擬100個玩家排隊
for (int i = 0; i < 100; i++) {
service.submit(new Player("玩家"+i,semaphore));
}
//關閉線程池
service.shutdown();
//判斷線程池是否中斷,沒有則循環查看當前排隊總人數
while (!service.isTerminated()){
System.out.println("當前排隊總人數:"+semaphore.getQueueLength());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
複製代碼
若是要切換成公平方式只需將上面初始化Semaphore改成下面的代碼便可
Semaphore semaphore = new Semaphore(5,true);
複製代碼
Exchanger主要用於線程間的數據交換。 它提供了一個同步點,在這個同步點,兩個線程能夠交換數據。
這裏寫了個兩個線程互相交換數據的簡單例子,下面ExchangerRunnable在run方法中調用exchange方法將本身的數據傳過去。
public class ExchangerRunnable implements Runnable {
private Object data;
private String name;
private Exchanger exchanger;
public ExchangerRunnable(String name, Exchanger exchanger, Object data) {
this.exchanger = exchanger;
this.name = name;
this.data = data;
}
public void run() {
try {
Object previous = this.data;
this.data = this.exchanger.exchange(previous);
System.out.println("名稱:" + name + " 以前數據:" + previous + " ,交換以後數據:" + this.data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
複製代碼
接下來看看測試代碼
public class Case {
private static final Exchanger exchanger = new Exchanger();
private static ExecutorService service = Executors.newFixedThreadPool(2);
public static void main(String[] args) {
service.submit(new ExchangerRunnable("1", exchanger, "A"));
service.submit(new ExchangerRunnable("2", exchanger, "B"));
service.shutdown();
}
}
複製代碼
定義了只包含兩個線程的線程池,而後建立提交兩個ExchangerRunnable的類
運行測試代碼,會獲得以下結果
名稱:2 以前數據:B ,交換以後數據:A
名稱:1 以前數據:A ,交換以後數據:B
複製代碼
案例源代碼地址:github.com/rainbowda/l…
歡迎fork、Star、Issue等,謝謝