以前咱們已經講解過關於AQS的獨佔鎖,這一章節主要講解AQS的共享鎖,以Semaphore
信號量來進行講解,相信經過看了本章節內容的同窗能夠對AQS的共享模式有一個瞭解,Semaphore
信號量提供了用於控制資源同時被訪問的個數,也就是它會維護一個許可證,訪問資源以前須要申請許可證,申請許可證成功後才能夠進行訪問,若是申請訪問資源獲取的了許可證,則能夠進行資源訪問,同時頒發許可證中心的許可證會進行增長,等到訪問資源的線程釋放資源後,許可證使用狀況會進行減小。java
public class SemaphoreDemo {
private static final Semaphore semaphore = new Semaphore(3);
private static final AtomicInteger atomicInteger = new AtomicInteger();
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
executorService.execute(() -> {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "開始執行");
Thread.sleep(100);
System.out.println(Thread.currentThread().getName() + "執行完畢");
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
}
複製代碼
運行結果以下:node
pool-1-thread-2開始執行
pool-1-thread-3開始執行
pool-1-thread-1開始執行
pool-1-thread-4開始執行
pool-1-thread-6開始執行
pool-1-thread-5開始執行
pool-1-thread-7開始執行
pool-1-thread-9開始執行
pool-1-thread-8開始執行
pool-1-thread-10開始執行
複製代碼
經過一個例子來撥開Semaphore
的面紗,上面咱們定義了信號量爲3個,也就是同時能夠得到鎖的線程只有3個,經過調用semaphore.acquire();
申請信號量,若是申請成功則執行下面的邏輯,在後面經過semaphore.release();
釋放掉信號量的佔用,也就是說經過semaphore.acquice
從信號量中獲取一個許可,若是許可經過則執行下面的語句,若是沒有許可可發放則等待,而後經過semaphore.release();
歸還一個許可,上面例子中的輸出內容也可清晰的看到同事只有兩個線程可以訪問資源。併發
藉助圖示來講明一下:ide
首先初始化的時候下面圖示內容是放置了3個許可證,以下圖所示: 函數
左邊是線程,中間是要訪問的資源文件,最右側表明的是許可證池,當第一個線程嘗試訪問資源時,須要先從信號量中得到一個許可,若是信號量中能夠得到許可則能夠訪問資源,若是沒有許可能夠頒發則表示須要等待,下面來看一下操做: 首先線程1訪問資源時,在沒有許可的訪問時,是行不通的,是須要先進行許可的申請,申請成功後才能夠訪問資源,此時信號量中的許可會進行減小,看上圖中只剩下了兩個許可,若是四個線程同時訪問時必然會有一個線程處於等待狀態,以下圖所示: 如上圖所示中間兩個線程簡化了申請的過程,直接擁有許可訪問權限,若是申請許可失敗了,則會等待其餘線程歸還許可,發現線程4正在等待許可中,當其餘線程調用release
方法後會將許可歸還給信號量中,以下圖所示:
這是線程4發現已經有可用的許可了,他就會不在等待拿着許可趕忙去訪問資源,這裏我就再也不畫圖了,和上面同樣,大體咱們已經瞭解了關於信號量的內容,接下來咱們就要對源碼進行分析。
Semaphore`主要方法有如下內容:oop
方法名 | 描述 |
---|---|
acquire() | 嘗試得到一個准入許可,如沒法得到,則線程等待,直到有線程釋放一個許可或當線程被中斷。 |
acquire(int permits) | 嘗試得到permits個准入許可,如沒法得到,則線程等待,直到有線程釋放permits個許可或當線程被中斷。 |
acquireUninterruptibly() | 嘗試得到一個准入許可,如沒法得到,則線程等待,直到有線程釋放一個許可,可是不響應中斷請求 |
acquireUninterruptibly(int permits) | 嘗試得到permits個准入許可,如沒法得到,則線程等待,直到有線程釋放permits個許可,可是不響應中斷請求 |
release() | 用於在線程訪問資源結束後,釋放一個許可,以使其餘等待許可的線程能夠進行資源訪問。 |
release(int permits) | 用於在線程訪問資源結束後,釋放permits個許可,以使其餘等待許可的線程能夠進行資源訪問。 |
tryAcquire() | 嘗試得到一個許可,若是得到許可成功返回true,若是失敗則返回fasle,它不會等待,當即返回 |
tryAcquire(int permits) | 嘗試得到permits個許可,若是得到許可成功返回true,若是失敗則返回fasle,它不會等待,當即返回 |
tryAcquire(int permits, long timeout, TimeUnit unit) | 嘗試在指定時間內得到permits個許可,若是在指定時間內沒有得到許可則則返回false,反之返回true |
tryAcquire(long timeout, TimeUnit unit) | 嘗試在指定時間內得到一個許可,若是在指定時間內沒有得到許可則則返回false,反之返回true |
availablePermits(): | 當前可用的許可數 |
經過上面方法的大體介紹,Semaphore
提供了對信號量獲取的操做,獲取的過程當中有等待操做,也有當即返回的方法,有的響應中斷有的又不響應中斷,下面會以一些簡單的例子,進行分析一下源碼內容,針對下面的例子來進行分析:源碼分析
public class SemaphoreDemo {
private static final Semaphore semaphore = new Semaphore(1);
private static final AtomicInteger atomicInteger = new AtomicInteger();
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 2; i++) {
Thread.sleep(100);
executorService.execute(() -> {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "開始執行");
Thread.sleep(10);
System.out.println(Thread.currentThread().getName() + "執行完畢");
// semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
}
複製代碼
針對上面的例子,咱們先初始化了1個信號量,線程池提交了2個任務,這時候你們也想到了運行結果,運行結果就是隻有1一個線程可以執行完畢,其他的線程都須要等到操做,由於信號量被消耗了,下面是輸出結果:ui
pool-1-thread-1開始執行
pool-1-thread-1執行完畢
複製代碼
咱們這裏特別在提交到線程池任務的時候睡眠了一會,其實想要達到的目的是可以讓每一個線程執行按照順序排下去,否則可能順序就不定了,固然也沒有太大影響,這裏只是爲了方便分析,當第一個線程提交任務到線程池時,它會先通過semaphore.acquire()
方法來進行得到一個許可操做,下面咱們來看一下源碼:this
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
複製代碼
咱們能夠看到它調用了sync.acquireSharedInterruptibly(1)
方法,這個snyc
實際上是Semaphore
內部類Sync
的實例對象,那麼問題來了,這個sync
變量是何時初始化的呢?其實當咱們初始化Semaphore
,就已經將sync
變量初始化了,接下來咱們看一下Semaphore
構造函數:atom
// 非公平模式
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
// fair=true爲公平模式,false=非公平模式
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
複製代碼
/** * 非公平模式 */
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
//實現AQS的tryAcquireShared方法,嘗試得到鎖。
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
/** * 公平模式 */
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
//實現AQS的tryAcquireShared方法,嘗試得到鎖。
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
複製代碼
咱們能夠發現Semaphore
提供了兩種模式的鎖機制,一種是公平模式,一種是非公平模式,公平模式其實就是若是發現了有線程在排隊等待,則自覺到後面去排隊,而非公平模式則不同,它無論你有沒有在排隊的線程,誰先搶到是誰的,說到這裏咱們發現上例子中當聲明Semaphore
時,其實默認使用了非公平模式NonfairSync
,指定了信號量數量爲1個,其實它內部Sync
中調用了AQS
的setState
方法,設置同步器狀態state
爲1,詳細以下圖所示:
接下來咱們在回到acquire
方法中,它調用了sync.acquireSharedInterruptibly(1);
,細心地朋友會發現NonfairSync
和父類Sync
中並無該方法,其實該方法是AQS
提供的方法,接下來咱們看一下這個方法到底作了什麼?源碼內容以下所示:
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted()) //若是線程被中斷則拋出異常。
throw new InterruptedException();
if (tryAcquireShared(arg) < 0) //嘗試獲取信號量,若是得到信號量小於0,則表明獲取失敗則運行下面的語句。
doAcquireSharedInterruptibly(arg); //將當前線程節點加入到隊列中,等到其餘線程釋放信號量。
}
複製代碼
doAcquireSharedInterruptibly
方法,將當前線程掛起等待其餘線程釋放信號量。接下來咱們看一下tryAcquireShared
方法實現,tryAcquireShared
這個方法是AQS
提供給子類實現的方法,它自身並無實現,只是拋出了異常,實現它的類必然是Semaphore
的Sync
類,咱們發現實現該方法有兩個,包括非公平模式的NonfairSync
,另一個是公平模式下的FairSync
,因爲咱們上例子中採用的是非公平模式,咱們看一下非公平模式下的tryAcquireShared
實現邏輯:
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
複製代碼
它(NonfairSync
)內部調用了父類Sync
的nonfairTryAcquireShared
方法,繼續刨根問底看一下這個方法:
final int nonfairTryAcquireShared(int acquires) {
//這裏上來就是個死循環
for (;;) {
//獲取可用的信號量數量。
int available = getState();
//剩餘線程數,其實就是當前可用信號量數量-申請的信號量數量
int remaining = available - acquires;
//1. 若是剩餘信號量數量小於0,表明沒有信號量可用
//2. 修改state成功,則表明申請信號量成功。
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
複製代碼
若是讀過前面ReentrantLock
源碼的朋友會發現,它對state
是增長的,也就是若是state
值被設置上了值,則表明已經有線程得到了鎖,其餘線程不容許獲取當前鎖,若是是當前線程從新得到鎖,則state
值會增長,這也是重入鎖的關鍵點,而Semaphore
與之不一樣點在於,它是對state
同步器狀態進行減小操做,換句話說先初始化若干信號量,若是得到信號量時,剩餘信號量小於0,則表明沒有可用的信號量,則直接返回,若是得到信號量成功則對state
值進行修改,回到上面的例子中,咱們剛分析道其中第一個線程,第一個線程得到到了信號量,此時剩餘信號量爲0,它會將state
值設置爲0,設置以後回到了acquireSharedInterruptibly
的if (tryAcquireShared(arg) < 0)
語句中,if語句爲true,不進入到if語句內部,此時AQS的狀況以下圖所示:
state
已經變成0了,當它執行到
tryAcquireShared
去獲取信號量時,可用信號量爲0個,當可用信號量減去申請的信號量個數1時,此時剩餘信號量變成了-1,因此這時候if語句的條件
remaining < 0
是知足的,進入到if語句中,返回的是剩餘信號量-1,此時會跳轉到調用地方,也就是AQS的
acquireSharedInterruptibly
方法中,這時候發現if語句中(
tryAcquireShared(arg) < 0
)返回結果是-1,會進入到if語句內部執行
doAcquireSharedInterruptibly
方法,這個方法主要操做是將當前線程放入到等待隊列中,等到其餘線程釋放信號量,接下來慢慢剖析一下內部源碼
AQS->doAcquireSharedInterruptibly
:
/** * Acquires in shared interruptible mode. * @param arg the acquire argument */
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.SHARED); //將節點添加到隊列中
boolean failed = true; //失敗標誌位,若是出現中斷,或者異常拋出時會進行取消操做
try {
for (;;) {
final Node p = node.predecessor(); //獲取當前節點的前節點
if (p == head) { //若是當前節點的前節點爲頭節點
int r = tryAcquireShared(arg); //嘗試得到鎖
if (r >= 0) { //若是能夠得到信號量
setHeadAndPropagate(node, r); //設置當前節點爲頭節點
p.next = null; // help GC //幫助GC回收
failed = false; //設置失敗爲false,也就是正常獲取
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
複製代碼
這時候當咱們的程序運行到addWaiter
,來看一下這個時候隊列的狀況:
Ref-405
是頭節點,而後再將當前線程的節點指向頭節點也就是上圖中所示內容,由於這裏
addWaiter
內部代碼和以前分析的AQS的獨佔鎖(也就是
ReentrantLock
源碼)的時候已經分析過了,這裏就不在贅述了,將線程加入到等待隊列中以後,接下來進入到for死循環中去,首先上來獲取當前節點的頭節點,也就是上圖的
Ref-405
,而後判斷是否是頭節點,這裏面的內容其實就是再去嘗試爭搶一下信號量,看有沒有信號量釋放,若是返回的信號量剩餘個數大於等於0,則表明爭搶信號量成功,須要對節點進行處理,可是咱們這個例子中,當進行
tryAcquireShared
時,返回的值是-1,因此獲取信號量失敗,不會進入到下面內容,可是咱們在這裏先進行分析分析一下這個方法
setHeadAndPropagate
,爲後面埋下伏筆:
//從方法中也能夠看出來大體意思是設置頭節點,而且根據條件是否喚醒後繼節點。
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // 記錄一下原來的head頭節點。
setHead(node); // 設置新的節點設置爲頭節點。
if (propagate > 0 || //若是有信號量
h == null || //頭節點爲空狀況下
h.waitStatus < 0 || //若是原來的頭結點的狀態是負數,這裏指的是SIGNAL和PROPAGATE兩個狀態
(h = head) == null || // 從新閱讀頭節點防止額外的競爭。
h.waitStatus < 0) { //若是原來的頭結點的狀態是負數,這裏指的是SIGNAL和PROPAGATE兩個狀態
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
複製代碼
當他沒有進入到setHeadAndPropagate方法,它會走下面的步驟:
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
複製代碼
shouldParkAfterFailedAcquire
將頭節點修改成SIGNAL
,parkAndCheckInterrupt
將線程進行阻塞,運行到這裏是線程被掛起,等待其餘線程喚醒,此時隊列狀態以下所示:
當咱們調用semaphore.release()
進行釋放信號量時,它其實調用的是AbstractQueuedSynchronizer
中的releaseShared(int arg)
方法,咱們來看一下源碼內容:
public void release() {
sync.releaseShared(1);
}
複製代碼
接下來分析一下AQS中的releaseShared
方法:
public final boolean releaseShared(int arg) {
// 調用Semaphore實現的tryReleaseShared方法。
if (tryReleaseShared(arg)) {
// 喚醒後記節點
doReleaseShared();
return true;
}
return false;
}
複製代碼
先進嘗試釋放信號量,若是信號量釋放成功,則進行調用doReleaseShared來進行喚醒等待的節點,告知隊列中等待的節點已經有信號量了能夠進行獲取了。
protected final boolean tryReleaseShared(int releases) {
for (;;) {
// 獲取當前的state值
int current = getState();
// 將當前的state值添加releases個信號量
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// cas修改state值
if (compareAndSetState(current, next))
return true;
}
}
複製代碼
其實最主要的方法是方法是doReleaseShared
方法,咱們來看一下源碼:
/** * 喚醒隊列中的節點,以及修改頭結點的waitStatus狀態爲PROPAGATE * 1. 若是頭節點等待狀態爲SIGNAL,則將頭節點狀態設爲0,並喚醒後繼節點 * 2. 若是頭節點等待狀態爲0,則將頭節點狀態設爲PROPAGATE,保證喚醒可以正常傳播下去。 */
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
// 若是頭結點的狀態爲SIGNAL則進行喚醒操做。
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
// 若是頭節點狀態爲0,則將頭節點狀態修改成PROPAGATE,至於爲何會變成0,爲何要有PROPAGATE?,請看下文。
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
複製代碼
這裏可能你們會有一個疑問,爲何不是直接propagate > 0,而後就直接喚醒下一個節點呢?這裏我要引用一下以前的版本中的一個bug來講明一下: BUG-6801020
根據BUG中的描述影響的版本號是 JDK 6u11,6u17 兩個版本,BUG中說起到了復現bug的代碼以下所示:
import java.util.concurrent.Semaphore;
public class TestSemaphore {
private static Semaphore sem = new Semaphore(0);
private static class Thread1 extends Thread {
@Override
public void run() {
sem.acquireUninterruptibly();
}
}
private static class Thread2 extends Thread {
@Override
public void run() {
sem.release();
}
}
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 10000000; i++) {
Thread t1 = new Thread1();
Thread t2 = new Thread1();
Thread t3 = new Thread2();
Thread t4 = new Thread2();
t1.start();
t2.start();
t3.start();
t4.start();
t1.join();
t2.join();
t3.join();
t4.join();
System.out.println(i);
}
}
}
複製代碼
接下來看一下受影響版本號中的setHeadAndPropagate
和releaseShared
兩個方法源碼,以下:
private void setHeadAndPropagate(Node node, int propagate) {
setHead(node);
// 這裏是區別點,他這裏直接是比較的信號量若是存在,而且當前節點的等待狀態不等於0,纔會去喚醒下一個線程。
if (propagate > 0 && node.waitStatus != 0) {
/* * Don't bother fully figuring out successor. If it * looks null, call unparkSuccessor anyway to be safe. */
Node s = node.next;
if (s == null || s.isShared())
unparkSuccessor(node);
}
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
複製代碼
在JDK JDK 6u11,6u17 版本中發現中是沒有PROPAGATE
這種狀態的,在後面的版本中引入是爲了解決共享模式下併發釋放致使的線程hang住問題,上面例子運行一段時間後,偶爾會出現線程hang住的狀況。上面例子中初始化了四個線程,信號量初始化時是0個,t1線程和t2線程是獲取信號量,t3和t4線程是釋放信號量,假設某種狀況極端的狀況下t1和t2添加到了隊列中,以下圖所示:
releaseShared
方法,會調用unparkSuccessor
,這個方法是用來通知等待線程,此時head中的waitStatus由-1變成0,而後喚醒線程t1,此時信號量爲1。doAcquireSharedInterruptibly
方法裏面,當線程喚醒的時候也是從這個方法中進行執行,當t1線程嘗試得到信號量時,發現能夠得到信號量,tryAcquireShared
返回的是0,由於消耗了一個信號量,而此時當前線程沒有進行繼續往下操做,而是進行了線程切換,此時線程狀態以下:releaseShared
,此時頭節點的waitStatus=0,直接返回false,並未調用unparkSuccessor,可是此時信號量變成了1。setHeadAndPropagate
方法的時候,他沒有進入到if語句的內部,因此t2線程一直沒有被喚醒,致使主線程掛起。jdk1.8中的setHeadAndPropagate
並無直接調用unparkSuccessor
方法,而是修改調用doReleaseShared
方法,咱們來看一下這個方法跟上面bug中有什麼區別:
/** * 1.喚醒隊列中的節點 * 2.若是隊列頭節點waitStatus=0,則將當前head頭節點修改成PROPAGATE(-3)狀態 */
private void doReleaseShared() {
/* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
複製代碼
這裏咱們回到上面第三步,此時切換到t4線程,t4調用releaseShared
,此時頭節點的waitStatus=0,直接返回false,並未調用unparkSuccessor,可是此時信號量變成了1,而且將head頭節點的waitStatus狀態修改成-3。
13.png
回到上面第四步驟:此時t1線程被喚醒,繼續執行將head節點指向了Ref-505
,而且當時的信號量只有1個,他本身消耗了信號量,雖然如今state=1,可是咱們能夠看線程切換時,信號量的state=0
,因此線程切換回去以後,它的propagate=0
,調用setHeadAndPropagate
方法的時候,此時head頭節點的狀態是PROPAGATE(-3)
,會進入到if語句中執行doReleaseShared
方法,此時喚醒線程t2。
因爲文章寫的時間比較長,中間伴隨着找工做,因此耽擱的時間有點長了,若是有錯誤的地方請指正,我這邊及時作更正。寫文章不易,讀到這裏的朋友都是好樣的,因爲文章篇幅有點長,在釋放信號量地方沒有圖示來進行表示,可是後面大體講解了釋放的流程因此這裏就不贅述了。謝謝