一 概述
二 源碼總覽
三 acquire-請求令牌
四 release-釋放令牌
五 總結
複製代碼
semaphore是信號的意思,在併發包中則表示持有指定數量令牌的信號量。它一般用於多線程同時請求令牌的控制。提供了acquire方法用於獲取令牌,當令牌發放完後則進行阻塞等待,持有令牌的線程完成任務後須要調用release方法歸還令牌。semaphore的使用很簡單,如今經過學習它的源碼來了解它的實現原理是怎樣的。java
public class Semaphore implements java.io.Serializable {
private static final long serialVersionUID = -3222578661600680210L;
// 全部機制都經過AbstractQueuedSynchronizer子類Sync來完成的
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
Sync(int permits) {
// 採用AQS中的state來統計令牌數
setState(permits);
}
……
}
// 非公平鎖
static final class NonfairSync extends Sync {……}
// 公平鎖
static final class FairSync extends Sync{……}
// 默認構造方法-默認非公平鎖
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
// 提供選擇公平鎖仍是非公平鎖的構造方法
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
// 請求一個令牌-響應中斷+阻塞
public void acquire() throws InterruptedException {……}
// 指定獲取領牌數-響應中斷且阻塞
public void acquire(int permits) throws InterruptedException {……}
// 請求一個令牌-不響應中斷+阻塞
public void acquireUninterruptibly() {……}
// 嘗試請求一個令牌-非阻塞,失敗當即返回
public boolean tryAcquire(){……}
// 嘗試請求一個令牌,阻塞指定時間,超時後返回
public boolean tryAcquire(long timeout, TimeUnit unit){……}
// 釋放令牌
public void release(){……}
}
複製代碼
整體來講,Semaphore內部有一個繼承於AQS的內部類Sync,利用AQS的共享鎖來實現對共享變量state進行操做,並將state做爲令牌的計數, 並提供了公平和非公平鎖的方式來獲取令牌,總體的設計跟ReentrantLock很像。下面以最經常使用的acquire和release方法爲例,詳細瞭解他們的原理;node
先看acquire的方法體:微信
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
複製代碼
裏面實際調用的是AQS的請求共享鎖方法:多線程
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 先進行一遍嘗試獲取鎖,當返回小於0說明令牌不足了,則須要將當前線程加入到等待隊列中
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
複製代碼
接着先回調在Semaphore中重寫的tryAcquireShared()方法嘗試獲取鎖:併發
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
// 循環獲取令牌
for (;;) {
// 獲取當前可用的令牌數
int available = getState();
// 當前獲取完後剩下的令牌數
int remaining = available - acquires;
// 剩下領牌數小於0或者大於等於0並CAS更新成功則直接返回剩餘令牌數
if (remaining < 0 || compareAndSetState(available, remaining))
return remaining;
}
}
複製代碼
簡單嘗試獲取令牌失敗後則再CAS嘗試幾回後加入同步隊列中休眠等待:oop
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
// 在同步隊列中增長等待節點
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
// 獲取當前節點的前驅節點
final Node p = node.predecessor();
// 若是前驅節點爲head節點,表示當前節點是同步等待隊列中的第一個,故繼續嘗試一次獲取鎖
if (p == head) {
// 嘗試獲取令牌,此時會跳轉到semaphore中(由於重寫了該方法)
int r = tryAcquireShared(arg);
// 返回大於0則表示成功獲取到令牌了
if (r >= 0) {
// 將當前節點設爲head節點
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 自旋幾回後爲避免強佔CPU,則對該線程進行休眠處理
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
// 因中斷請求則取消排隊請求
if (failed)
cancelAcquire(node);
}
}
複製代碼
簡單總結下acquire方法流程爲:學習
release方法體:ui
public void release() {
sync.releaseShared(1);
}
複製代碼
實際調用的是AQS的方法:this
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
複製代碼
先調用在Semaphore中重寫的嘗試釋放令牌方法,而且釋放成功後返回true:spa
protected final boolean tryReleaseShared(int releases) {
for (;;) {
// 獲取當前令牌數量
int current = getState();
// 計算釋放後的令牌數量
int next = current + releases;
// 若是本次釋放的令牌爲負數則拋出異常
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// CAS更新令牌數成功後返回true
if (compareAndSetState(current, next))
return true;
}
}
複製代碼
釋放令牌成功後,喚醒在同步隊列中等待的線程:
private void doReleaseShared() {
/* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
複製代碼
簡單總結下release流程:
Semaphore利用AQS中的共享鎖來操做共享變量state,並使用state做爲令牌的計數。每一個線程調用required請求獲取令牌,調用release則釋放領牌。當令牌取完時則剩下的線程加入AQS隊列中阻塞等待,當有令牌釋放時會喚醒等待的線程。