歡迎關注掘金:【Ccww】,一塊兒學習
提高能力,漲薪可待
面試知識,工做可待
實戰演練,拒絕996
也歡迎關注微信公衆號【Ccww筆記】,原創技術文章第一時間推出
若是此文對你有幫助、喜歡的話,那就點個讚唄!html
是否是感受在工做上難於晉升了呢?
是否是感受找工做面試是那麼難呢?
是否是感受本身天天都在996加班呢?node
在工做上必須保持學習的能力,這樣才能在工做獲得更好的晉升,漲薪指日可待,歡迎一塊兒學習【提高能力,漲薪可待】系列
在找工做面試應在學習的基礎進行總結面試知識點,工做也指日可待,歡迎一塊兒學習【面試知識,工做可待】系列
最後,理論知識到準備充足,是否是該躬行起來呢?歡迎一塊兒學習【實戰演練,拒絕996】系列面試
AQS全稱AbstractQueuedSynchronizer
,即抽象的隊列同步器,是一種用來構建鎖和同步器的框架。api
基於AQS構建同步器:安全
優點:微信
若是被請求的共享資源空閒,則將當前請求資源的線程設置爲有效的工做線程,而且將共享資源設置爲鎖定狀態。若是被請求的共享資源被佔用,那麼就須要一套線程阻塞等待以及被喚醒時鎖分配的機制,這個機制AQS是用CLH隊列鎖實現的,即將暫時獲取不到鎖的線程加入到隊列中。如圖所示:併發
Sync queue: 同步隊列,是一個雙向列表。包括head節點和tail節點。head節點主要用做後續的調度。框架
Condition queue: 非必須,單向列表。當程序中存在cindition的時候纔會存在此列表。AQS它的全部子類中,要麼實現並使用了它的獨佔功能的api,要麼使用了共享鎖的功能,而不會同時使用兩套api,即使是最有名的子類ReentrantReadWriteLock也是經過兩個內部類讀鎖和寫鎖分別實現了兩套api來實現的學習
state狀態使用volatile int類型的變量,表示當前同步狀態。state的訪問方式有三種:ui
CANCELLED
waitStatus值爲1時表示該線程節點已釋放(超時、中斷),已取消的節點不會再阻塞。
SIGNAL
waitStatus爲-1時表示該線程的後續線程須要阻塞,即只要前置節點釋放鎖,就會通知標識爲 SIGNAL 狀態的後續節點的線程
CONDITION
waitStatus爲-2時,表示該線程在condition隊列中阻塞(Condition有使用)
PROPAGATE
waitStatus爲-3時,表示該線程以及後續線程進行無條件傳播(CountDownLatch中有使用)共享模式下, PROPAGATE 狀態的線程處於可運行狀態
由於只有前驅節點是head節點的節點才能被首先喚醒去進行同步狀態的獲取。當該節點獲取到同步狀態時,它會清除本身的值,將本身做爲head節點,以便喚醒下一個節點。
除了同步隊列以外,AQS中還存在Condition隊列,這是一個單向隊列。調用ConditionObject.await()方法,可以將當前線程封裝成Node加入到Condition隊列的末尾,而後將獲取的同步狀態釋放(即修改同步狀態的值,喚醒在同步隊列中的線程)。
Condition隊列也是FIFO。調用ConditionObject.signal()方法,可以喚醒firstWaiter節點,將其添加到同步隊列末尾。
在構建自定義同步器時,只須要依賴AQS底層再實現共享資源state的獲取與釋放操做便可。自定義同步器實現時主要實現如下幾種方法:
線程首先嚐試獲取鎖,若是失敗就將當前線程及等待狀態等信息包裝成一個node節點加入到FIFO隊列中。 接着會不斷的循環嘗試獲取鎖,條件是當前節點爲head的直接後繼纔會嘗試。若是失敗就會阻塞本身直到本身被喚醒。而當持有鎖的線程釋放鎖的時候,會喚醒隊列中的後繼線程。
所謂獨佔模式,即只容許一個線程獲取同步狀態,當這個線程尚未釋放同步狀態時,其餘線程是獲取不了的,只能加入到同步隊列,進行等待。
很明顯,咱們能夠將state的初始值設爲0,表示空閒。當一個線程獲取到同步狀態時,利用CAS操做讓state加1,表示非空閒,那麼其餘線程就只能等待了。釋放同步狀態時,不須要CAS操做,由於獨佔模式下只有一個線程能獲取到同步狀態。ReentrantLock、CyclicBarrier正是基於此設計的。
例如,ReentrantLock,state初始化爲0,表示未鎖定狀態。A線程lock()時,會調用tryAcquire()獨佔該鎖並將state+1。
獨佔模式下的AQS是不響應中斷的,指的是加入到同步隊列中的線程,若是由於中斷而被喚醒的話,不會當即返回,而且拋出InterruptedException。而是再次去判斷其前驅節點是否爲head節點,決定是否爭搶同步狀態。若是其前驅節點不是head節點或者爭搶同步狀態失敗,那麼再次掛起。acquire以獨佔exclusive方式獲取資源。若是獲取到資源,線程直接返回,不然進入等待隊列,直到獲取到資源爲止,且整個過程忽略中斷的影響。源碼以下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
複製代碼
流程圖:
tryAcquire()
嘗試直接去獲取資源,若是成功則直接返回;addWaiter()
將該線程加入等待隊列的尾部,並標記爲獨佔模式;acquireQueued()
使線程在等待隊列中休息,有機會時(輪到本身,會被unpark()
)會去嘗試獲取資源。獲取到資源後才返回。若是在整個等待過程當中被中斷過,則返回true,不然返回false。selfInterrupt()
,將中斷補上。tryAcquire
嘗試以獨佔的方式獲取資源,若是獲取成功,則直接返回true
,不然直接返回false
,且具體實現由自定義AQS的同步器實現的。
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
複製代碼
根據不一樣模式(Node.EXCLUSIVE
互斥模式、Node.SHARED
共享模式)建立結點並以CAS的方式將當前線程節點加入到不爲空的等待隊列的末尾(經過compareAndSetTail()
方法)。若是隊列爲空,經過enq(node)
方法初始化一個等待隊列,並返回當前節點。
/**
* 參數
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* 返回值
* @return the new node
*/
private Node addWaiter(Node mode) {
//將當前線程以指定的模式建立節點node
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 獲取當前同隊列的尾節點
Node pred = tail;
//隊列不爲空,將新的node加入等待隊列中
if (pred != null) {
node.prev = pred;
//CAS方式將當前節點尾插入隊列中
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//當隊列爲empty或者CAS失敗時會調用enq方法處理
enq(node);
return node;
}
複製代碼
其中,隊列爲empty,使用enq(node)
處理,將當前節點插入等待隊列,若是隊列爲空,則初始化當前隊列。全部操做都是CAS自旋的方式進行,直到成功加入隊尾爲止。
private Node enq(final Node node) {
//不斷自旋
for (;;) {
Node t = tail;
//當前隊列爲empty
if (t == null) { // Must initialize
//完成隊列初始化操做,頭結點中不放數據,只是做爲起始標記,lazy-load,在第一次用的時候new
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
//不斷將當前節點使用CAS尾插入隊列中直到成功爲止
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
複製代碼
acquireQueued
用於已在隊列中的線程以獨佔且不間斷模式獲取state狀態,直到獲取鎖後返回。主要流程:
結點node進入隊列尾部後,檢查狀態;
調用park()進入waiting狀態,等待unpark()或interrupt()喚醒;
被喚醒後,是否獲取到鎖。若是獲取到,head指向當前結點,並返回從入隊到獲取鎖的整個過程當中是否被中斷過;若是沒獲取到,繼續流程1
final boolean acquireQueued(final Node node, int arg) {
//是否已獲取鎖的標誌,默認爲true 即爲還沒有
boolean failed = true;
try {
//等待中是否被中斷過的標記
boolean interrupted = false;
for (;;) {
//獲取前節點
final Node p = node.predecessor();
//若是當前節點已經成爲頭結點,嘗試獲取鎖(tryAcquire)成功,而後返回
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//shouldParkAfterFailedAcquire根據對當前節點的前一個節點的狀態進行判斷,對當前節點作出不一樣的操做
//parkAndCheckInterrupt讓線程進入等待狀態,並檢查當前線程是否被能夠被中斷
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//將當前節點設置爲取消狀態;取消狀態設置爲1
if (failed)
cancelAcquire(node);
}
}
複製代碼
release方法是獨佔exclusive模式下線程釋放共享資源的鎖。它會調用tryRelease()釋放同步資源,若是所有釋放了同步狀態爲空閒(即state=0),當同步狀態爲空閒時,它會喚醒等待隊列裏的其餘線程來獲取資源。這也正是unlock()的語義,固然不只僅只限於unlock().
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
複製代碼
tryRelease()
跟tryAcquire()
同樣實現都是由自定義定時器以獨佔exclusive模式實現的。由於其是獨佔模式,不須要考慮線程安全的問題去釋放共享資源,直接減掉相應量的資源便可(state-=arg)。並且tryRelease()
的返回值表明着該線程是否已經完成資源的釋放,所以在自定義同步器的tryRelease()
時,須要明確這條件,當已經完全釋放資源(state=0),要返回true,不然返回false。
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
複製代碼
ReentrantReadWriteLock的實現:
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//減掉相應量的資源(state-=arg)
int nextc = getState() - releases;
//是否徹底釋放資源
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
複製代碼
unparkSuccessor
用unpark()喚醒等待隊列中最前驅的那個未放棄線程,此線程並不必定是當前節點的next節點,而是下一個能夠用來喚醒的線程,若是這個節點存在,調用unpark()方法喚醒。
private void unparkSuccessor(Node node) {
//當前線程所在的結點node
int ws = node.waitStatus;
//置零當前線程所在的結點狀態,容許失敗
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//找到下一個須要喚醒的結點
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
// 從後向前找
for (Node t = tail; t != null && t != node; t = t.prev)
//從這裏能夠看出,<=0的結點,都是還有效的結點
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//喚醒
LockSupport.unpark(s.thread);
}
複製代碼
共享模式,固然是容許多個線程同時獲取到同步狀態,共享模式下的AQS也是不響應中斷的.
很明顯,咱們能夠將state的初始值設爲N(N > 0),表示空閒。每當一個線程獲取到同步狀態時,就利用CAS操做讓state減1,直到減到0表示非空閒,其餘線程就只能加入到同步隊列,進行等待。釋放同步狀態時,須要CAS操做,由於共享模式下,有多個線程能獲取到同步狀態。CountDownLatch、Semaphore正是基於此設計的。
例如,CountDownLatch,任務分爲N個子線程去執行,同步狀態state也初始化爲N(注意N要與線程個數一致):
acquireShared
在共享模式下線程獲取共享資源的頂層入口。它會獲取指定量的資源,獲取成功則直接返回,獲取失敗則進入等待隊列,直到獲取到資源爲止,整個過程忽略中斷。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
複製代碼
流程:
tryAcquireShared()
跟獨佔模式獲取資源方法同樣實現都是由自定義同步器去實現。但AQS規範中已定義好tryAcquireShared()
的返回值:
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
複製代碼
doAcquireShared()
用於將當前線程加入等待隊列尾部休息,直到其餘線程釋放資源喚醒本身,本身成功拿到相應量的資源後才返回。
private void doAcquireShared(int arg) {
//加入隊列尾部
final Node node = addWaiter(Node.SHARED);
//是否成功標誌
boolean failed = true;
try {
//等待過程當中是否被中斷過的標誌
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();//獲取前驅節點
if (p == head) {//若是到head的下一個,由於head是拿到資源的線程,此時node被喚醒,極可能是head用完資源來喚醒本身的
int r = tryAcquireShared(arg);//嘗試獲取資源
if (r >= 0) {//成功
setHeadAndPropagate(node, r);//將head指向本身,還有剩餘資源能夠再喚醒以後的線程
p.next = null; // help GC
if (interrupted)//若是等待過程當中被打斷過,此時將中斷補上。
selfInterrupt();
failed = false;
return;
}
}
//判斷狀態,隊列尋找一個適合位置,進入waiting狀態,等着被unpark()或interrupt()
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
複製代碼
releaseShared()
用於共享模式下線程釋放共享資源,釋放指定量的資源,若是成功釋放且容許喚醒等待線程,它會喚醒等待隊列裏的其餘線程來獲取資源。
public final boolean releaseShared(int arg) {
//嘗試釋放資源
if (tryReleaseShared(arg)) {
//喚醒後繼結點
doReleaseShared();
return true;
}
return false;
}
複製代碼
獨佔模式下的tryRelease()在徹底釋放掉資源(state=0)後,纔會返回true去喚醒其餘線程,這主要是基於獨佔下可重入的考量;而共享模式下的releaseShared()則沒有這種要求,共享模式實質就是控制必定量的線程併發執行,那麼擁有資源的線程在釋放掉部分資源時就能夠喚醒後繼等待結點。
https://www.cnblogs.com/waterystone/p/4920797.html
doReleaseShared()
主要用於喚醒後繼節點線程,當state爲正數,去獲取剩餘共享資源;當state=0時去獲取共享資源。
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
//喚醒後繼
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
// head發生變化
if (h == head)
break;
}
}
複製代碼
歡迎關注微信公衆號【Ccww筆記】,原創技術文章第一時間推出