以前介紹過併發問題的解決方式就是通常經過鎖,concurrent包中最重要的接口就是lock接口,它能夠顯示的獲取或者釋放鎖,對於lock接口來講最多見的實現就是ReetrantLock(可重入鎖),而ReetrantLock的實現又離不開AQS。node
AQS是concurrent包中最核心的併發組件,在讀本文以前建議先閱讀:設計模式
https://juejin.im/post/5c021da16fb9a049e65ffcbf 完全理解CAS機制,由於CAS在整個ReetrantLock中隨處可見,它是lock的基礎。安全
網上有許多相似文章,可是這一部分的東西比較抽象,須要不斷理解,本文將基於源碼分析concurrent包的最核心的組件AQS,將很差理解的部分儘可能用圖片來分析完全理解ReetrantLock的原理性能優化
這部分是concurrent包的核心,理解了以後再去理解SemaPhore LinkedBlockingQueue ArrayBlockingQueue 等就信手拈來了,因此會花比較多的篇幅bash
先大概看一看lock接口數據結構
public interface Lock {
// 加鎖
void lock();
// 可中斷獲取鎖,獲取鎖的過程當中能夠中斷。
void lockInterruptibly() throws InterruptedException;
//當即返回的獲取鎖,返回true表示成功,false表示失敗
boolean tryLock();
//根據傳入時間當即返回的獲取鎖,返回true表示成功,false表示失敗
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
//解鎖
void unlock();
//獲取等待的條件隊列(以後會詳細分析)
Condition newCondition();
}複製代碼
而咱們通常使用ReetrantLock:多線程
Lock lock = new ReentrantLock();
lock.lock();
try{
//業務代碼......
}finally{
lock.unlock();
}複製代碼
它在使用上是比較簡單的,在正式分析以前咱們先看看什麼是公平鎖和非公平鎖併發
公平鎖是指多個線程按照申請鎖的順序來獲取鎖,線程直接進入FIFO隊列,隊列中的第一個線程才能得到鎖。框架
用一個打水的例子來理解:函數
公平鎖的優勢是等待鎖的線程不會夯死。缺點是吞吐效率相對非公平鎖要低,等待隊列中除第一個線程之外的全部線程都會阻塞,CPU喚醒阻塞線程的開銷比非公平鎖大。
非公平鎖是多個線程加鎖時直接嘗試獲取鎖,獲取不到纔會到等待隊列的隊尾等待。但若是此時鎖恰好可用,那麼這個線程能夠無需阻塞直接獲取到鎖,因此非公平鎖有可能出現後申請鎖的線程先獲取鎖的場景。
非公平鎖的優勢是能夠減小喚起線程的開銷(由於可能有的線程能夠直接獲取到鎖,CPU也就不用喚醒它),因此總體的吞吐效率高。缺點是處於等待隊列中的線程可能會夯死(試想剛好每次有新線程來,它恰巧都每次獲取到鎖,此時還在排隊等待獲取鎖的線程就悲劇了),或者等好久纔會得到鎖。
公平鎖和非公平鎖的差別在因而否按照申請鎖的順序來獲取鎖,非公平鎖可能會出現有多個線程等待時,有一我的品特別的好的線程直接沒有等待而直接獲取到了鎖的狀況,他們各有利弊;ReetrantLock在構造時默認是非公平的,能夠經過參數控制。
這裏以ReentrantLock爲例,簡單講解ReentrantLock與AQS的關係
從上圖咱們能夠總結:
1. 首先爲何要有Sync這個內部類呢?
2. AQS爲何要聲明爲Abstract,內部卻沒有任何abstract方法?
這是由於AQS只是做爲一個基礎組件,從上圖能夠看出countDownLatch,Semaphore等併發組件都依賴了它,它並不但願直接做爲直接操做類對外輸出,而更傾向於做爲一個基礎併發組件,爲真正的實現類提供基礎設施,例如構建同步隊列,控制同步狀態等。
AQS是採用模板方法的設計模式,它做爲基礎組併發件,封裝了一層核心併發操做(好比獲取資源成功後封裝成Node加入隊列,對隊列雙向鏈表的處理),可是實現上分爲兩種模式,即共享模式(如Semaphore)與獨佔模式(如ReetrantLock,這兩個模式的本質區別在於多個線程能不能共享一把鎖),而這兩種模式的加鎖與解鎖實現方式是不同的,但AQS只關注內部公共方法實現並不關心外部不一樣模式的實現,因此提供了模板方法給子類使用:例如:
ReentrantLock須要本身實現tryAcquire()方法和tryRelease()方法,而實現共享模式的Semaphore,則須要實現tryAcquireShared()方法和tryReleaseShared()方法,這樣作的好處?由於不管是共享模式仍是獨佔模式,其基礎的實現都是同一套組件(AQS),只不過是加鎖解鎖的邏輯不一樣罷了,更重要的是若是咱們須要自定義鎖的話,也變得很是簡單,只須要選擇不一樣的模式實現不一樣的加鎖和解鎖的模板方法便可。
ReetrantLock:實現了lock接口,內部類有Sync、NonfairSync、FairSync(他們三個是繼承了AQS)這裏用了模板方法的設計模式。
以前介紹AQS是提供基礎設施,如構建同步隊列,控制同步狀態等,它的工做原理是怎樣的呢?
咱們先看看AQS類中幾個重要的字段:
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer{
//指向同步隊列隊頭
private transient volatile Node head;
//指向同步的隊尾
private transient volatile Node tail;
//同步狀態,0表明鎖未被佔用,1表明鎖已被佔用
private volatile int state;
//省略.
}
複製代碼
再看看Node這個內部類:它是對每個訪問同步代碼塊的線程的封裝
關於等待狀態,咱們暫時只需關注SIGNAL 和初始化狀態便可
AQS本質上就是由node構成的雙向鏈表,內部有node head和node tail。
AQS經過定義的state字段來控制同步狀態,當state=0時,說明沒有鎖資源被站東,當state=1時,說明有線程目前正在使用鎖的資源,這個時候其餘線程必須加入同步隊列進行等待;
既然要加入隊列,那麼AQS是內部經過內部類Node構成FIFO的同步隊列實現線程獲取鎖排隊,同時利用內部類ConditionObject構建條件隊列,當調用condition.wait()方法後,線程將會加入條件隊列中,而當調用signal()方法後,線程將從條件隊列移動到同步隊列中進行鎖競爭。注意這裏涉及到兩種隊列,一種的同步隊列,當鎖資源已經被佔用,而又有線程請求鎖而等待的後將加入同步隊列等待,而另外一種則是條件隊列(可有多個),經過Condition調用await()方法釋放鎖後,將加入等待隊列。
條件隊列能夠暫時先放一邊,下一節再詳細分析,由於當咱們調用ReetrantLock.lock()方法時,實際操做的是基於node結構的同步隊列,此時AQS中的state變量則是表明同步狀態,加鎖後,若是此時state的值爲0,則說明當前線程能夠獲取到鎖,同時將state設置爲1,表示獲取成功。若是調用ReetrantLock.lock()方法時state已爲1,也就是當前鎖已被其餘線程持有,那麼當前執行線程將被封裝爲Node結點加入同步隊列等待。
如上圖所示爲AQS的同步隊列模型;
接下來咱們看詳細實現
AQS的實現依賴於內部的同步隊列(就是一個由node構成的FIFO的雙向鏈表對列)來完成對同步狀態(state)的管理,當前線程獲取鎖失敗時,AQS會將該線程封裝成一個Node並將其加入同步隊列,同時會阻塞當前線程,當同步資源釋放時,又會將頭結點head中的線程喚醒,讓其嘗試獲取同步狀態。這裏從ReetrantLock入手分析AQS的具體實現,咱們先以非公平鎖爲例進行分析。
來看ReetrantLock的源碼:
//默認構造,建立非公平鎖NonfairSync
public ReentrantLock() {
sync = new NonfairSync();
}
//根據傳入參數建立鎖類型
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
//加鎖操做
public void lock() {
sync.lock();
}
複製代碼
這裏說明ReetrantLock默認構造方法就是構造一個非公平鎖,調用lock方法時候:
/**
* 非公平鎖實現
*/
static final class NonfairSync extends Sync {
//加鎖
final void lock() {
//執行CAS操做,本質就是CAS更新state:
//判斷state是否爲0,若是爲0則把0更新爲1,並返回true不然返回false
if (compareAndSetState(0, 1))
//成功則將獨佔鎖線程設置爲當前線程
setExclusiveOwnerThread(Thread.currentThread());
else
//不然再次請求同步狀態
acquire(1);
}
}
複製代碼
也就是說,經過CAS機制保證併發的狀況下只有一個線程能夠成功將state設置爲1,獲取到鎖;
此時,其它線程在執行compareAndSetState時,由於state此時不是0,因此會失敗並返回false,執行acquire(1);
public final void acquire(int arg) {
//再次嘗試獲取同步狀態
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
複製代碼
這裏傳入參數arg是state的值,由於要獲取鎖,而status爲0時是釋放鎖,1則是獲取鎖,因此這裏通常傳遞參數爲1,進入方法後首先會執行tryAcquire(1)方法,在前面分析過該方法在AQS中並無具體實現,而是交由子類實現,所以該方法是由ReetrantLock類內部類實現的
//NonfairSync類
static final class NonfairSync extends Sync {
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
複製代碼
假設有三個線程:線程1已經得到到了鎖,線程2正在同步隊列中排隊,此時線程3執行lock方法嘗試獲取鎖的時,線程1正好釋放了鎖,將state更新爲0,那麼線程3就可能在線程2尚未被喚醒以前去獲取到這個鎖。
若是此時尚未獲取到鎖(nonfairTryAcquire返回false),那麼接下來會把該線程封裝成node去同步隊列裏排隊,代碼層面上執行的是acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
ReetrantLock爲獨佔鎖,因此傳入的參數爲Node.EXCLUSIVE
private Node addWaiter(Node mode) {
//將請求同步狀態失敗的線程封裝成結點
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
//若是是第一個結點加入確定爲空,跳過。
//若是非第一個結點則直接執行CAS入隊操做,嘗試在尾部快速添加
if (pred != null) {
node.prev = pred;
//使用CAS執行尾部結點替換,嘗試在尾部快速添加
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//若是第一次加入或者CAS操做沒有成功執行enq入隊操做
enq(node);
return node;
}
複製代碼
其中tail是AQS的成員變量,指向隊尾(這點前面的咱們分析過AQS維持的是一個雙向的鏈表結構同步隊列),若是第一次獲取到鎖,AQS尚未初始化,則爲tail確定爲空,那麼將執行enq(node)操做,若是非第一個結點即tail指向不爲null,直接嘗試執行CAS操做加入隊尾(再一次使用CAS操做實現線程安全),若是CAS操做失敗或第一次加入同步隊列仍是會執行enq(node),繼續看enq(node):
private Node enq(final Node node) {
//死循環
for (;;) {
Node t = tail;
//若是隊列爲null,即沒有頭結點
if (t == null) { // Must initialize
//建立並使用CAS設置頭結點
if (compareAndSetHead(new Node()))
tail = head;
} else {//隊尾添加新結點
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
複製代碼
這個方法使用一個死循環進行CAS操做,能夠解決多線程併發問題。這裏作了兩件事:
一是隊列不存在的建立新結點並初始化tail、head:使用compareAndSetHead設置頭結點,head和tail都指向head。
二是隊列已存在,則將新結點node添加到隊尾。
注意addWaiter和enq這兩個方法都存在一樣的代碼將線程設置爲同步隊列的隊尾:
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}複製代碼
這是由於,在多線程環境中,假設線程一、二、三、4同時執行addWaiter()方法入隊,而此時頭節點不爲null,那麼他們會同時執行addWaiter中的compareAndSetTail方法將隊尾指向它,添加到隊尾。
但這個時候CAS操做保證只有一個能夠成功,假設此時線程1成功添加到隊尾,那麼線程二、三、4執行CAS都會失敗,那麼線程二、三、4會在enq這個方法內部死循環執行compareAndSetTail方法將隊尾指向它,直到成功添加到隊尾爲止。enq這個方法在內部對併發狀況下進行補償。
回到以前的acquire()
方法,添加到同步隊列後,結點就會進入一個自旋過程,自旋的意思就是原地轉圈圈:即結點都在觀察時機準備獲取同步狀態,自旋過程是在acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
方法中執行的,先看前半部分
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
//自旋,死循環
for (;;) {
//獲取前結點
final Node p = node.predecessor();
當且僅當p爲頭結點才嘗試獲取同步狀態,FIFO
if (p == head && tryAcquire(arg)) {
//此時當前node前驅節點爲head且已經tryAcquire獲取到了鎖,正在執行了它的相關信息
//已經沒有任何用處了,因此如今須要考慮將它GC掉
//將node設置爲頭結點
setHead(node);
//清空原來頭結點的引用便於GC
p.next = null; // help GC
failed = false;
return interrupted;
}
//若是前驅結點不是head,判斷是否掛起線程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
//最終都沒能獲取同步狀態,結束該線程的請求
cancelAcquire(node);
}
}
複製代碼
//設置爲頭結點
private void setHead(Node node) {
head = node;
//清空結點數據以便於GC
node.thread = null;
node.prev = null;
}
複製代碼
死循環中,若是知足了if (p == head && tryAcquire(arg))
以下圖,會執行sethead方法:
固然若是前驅結點不是head而它又沒有獲取到鎖,那麼執行以下:
//若是前驅結點不是head,判斷是否掛起線程
if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())
interrupted = true;
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//獲取當前結點的等待狀態
int ws = pred.waitStatus;
//若是爲等待喚醒(SIGNAL)狀態則返回true
if (ws == Node.SIGNAL)
return true;
//若是ws>0 則說明是結束狀態,
//遍歷前驅結點直到找到沒有結束狀態的結點
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//若是ws小於0又不是SIGNAL狀態,
//則將其設置爲SIGNAL狀態,表明該結點的線程正在等待喚醒。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
//將當前線程掛起,線程會阻塞住
LockSupport.park(this);
//獲取線程中斷狀態,interrupted()是判斷當前中斷狀態,
//並不是中斷線程,所以可能true也可能false,並返回
return Thread.interrupted();
}
複製代碼
這段代碼有個設計比較好的點:
一般咱們在設計隊列時,咱們須要考慮如何最大化的減小後續排隊節點對於CPU的消耗,而在AQS中,只要當前節點的前驅節點不是頭結點,再把當前節點加到隊列後就會執行LockSupport.park(this);將當前線程掛起,這樣能夠最大程度減小CPU消耗。
是否是仍是有點一頭霧水?
不要緊,爲了方便理解:咱們假設ABC三個線程如今同時去獲取鎖,A首先獲取到鎖後一直不釋放,BC加入隊列。那麼對於AQS的同步隊列結構是如何變化的呢?
一、A直接獲取到鎖:
代碼執行路徑:
(ReetranLock.lock()-> compareAndSetState(0, 1) -> setExclusiveOwnerThread(Thread.currentThread())
此時AQS結構尚未初始化:
二、B嘗試獲取鎖:
由於A存在把state設置爲1,因此B獲取鎖失敗,進行入隊操做加入同步隊列,入隊時發現AQS尚未初始化(AQS中的tail爲null),會在入隊前初始化代碼在enq方法的死循環中:
初始化以後改變tail的prev指向,把本身加到隊尾:
接着會執行acquireQueued方法:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}複製代碼
第一次執行:發現本身前序節點是head節點,因而乎再次嘗試獲取鎖,獲取失敗後再shouldParkAfterFailedAcquire方法中把前序節點設置爲Singal狀態
第二次執行:再次嘗試獲取鎖,但由於前序節點是Signal狀態了,因此執行parkAndCheckInterrupt把本身休眠起來進行自旋
三、C嘗試獲取鎖:
C獲取鎖和B徹底同樣,不一樣的是它的前序節點是B,因此它並不會一直嘗試獲取鎖,只會呆在B後面park住
AQS經過最簡單的CAS和LockSupport的park,設計出了高效的隊列模型和機制:
一、AQS結構實際上是在第二個線程獲取鎖的時候再初始化的,就是lazy-Init的思想,最大程度減小沒必要要的代碼執行的開銷
二、爲了最大程度上提高效率,儘可能避免線程間的通信,採用了雙向鏈表的Node結構去存儲線程
三、爲了最大程度上避免CPU上下文切換執行的消耗,在設計排隊線程時,只有頭結點的下一個的線程在一直重複執行獲取鎖,隊列後面的線程會經過LockSupport進行休眠。
上代碼:
//ReentrantLock類的unlock
public void unlock() {
sync.release(1);
}
//AQS類的release()方法
public final boolean release(int arg) {
//嘗試釋放鎖
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//喚醒後繼結點的線程
unparkSuccessor(h);
return true;
}
return false;
}
//ReentrantLock類中的內部類Sync實現的tryRelease(int releases)
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//判斷狀態是否爲0,若是是則說明已釋放同步狀態
if (c == 0) {
free = true;
//設置Owner爲null
setExclusiveOwnerThread(null);
}
//設置更新同步狀態
setState(c);
return free;
}
複製代碼
一句話總結:釋放鎖首先就是把volatile類型的變量state減1。state從1變成0.
unparkSuccessor(h)的做用的喚醒後續的節點:
private void unparkSuccessor(Node node) {
//這裏,node是head節點。
int ws = node.waitStatus;
if (ws < 0)//置零當前線程所在的結點狀態,容許失敗。
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;//找到下一個須要喚醒的結點s
if (s == null || s.waitStatus > 0) {//若是爲空或已取消
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)//從這裏能夠看出,<=0的結點,都是還有效的結點。
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);//喚醒
}
複製代碼
從代碼執行操做來看,這裏主要做用是用unpark()喚醒同步隊列中最前邊未放棄線程(也就是狀態爲CANCELLED的線程結點s)。此時,回憶前面分析進入自旋的函數acquireQueued(),s結點的線程被喚醒後,會進入acquireQueued()函數的if (p == head && tryAcquire(arg))的判斷,而後s把本身設置成head結點,表示本身已經獲取到資源了,最終acquire()也返回了,這就是獨佔鎖釋放的過程。
回到以前的圖:A B C三個線程獲取鎖,A已經獲取到了鎖,BC在隊列裏面,此時A釋放鎖
執行b.unpark,B被喚醒後繼續執行
if (p == head && tryAcquire(arg))
複製代碼
由於B的前序節點是head,因此會執行tryAcquire方法嘗試獲取鎖,獲取到鎖以後執行setHead方法把本身設置爲頭節點,而且把以前的頭結點也就是上圖中的new Node()設置爲null以便於GC:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
//獲取到鎖以後將當前node設置爲頭結點 head指向當前節點node
setHead(node);
//p.next就是以前的頭結點,它沒有用了,因此把它gc掉
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}複製代碼
總之,AQS內部有一個同步隊列,線程獲取同步狀態失敗以後會被封裝成node經過park進行自旋,而在釋放同步狀態時,經過unpark進行喚醒後面一個線程,讓後面線程得以繼續獲取鎖。
瞭解完ReetrantLock中非公平鎖的實現後,咱們再來看看公平鎖。與非公平鎖不一樣的是,在獲取鎖的時,公平鎖的獲取順序是徹底遵循時間上的FIFO規則,也就是說先請求的線程必定會先獲取鎖,後來的線程確定須要排隊。
下面比較一下公平鎖和非公平鎖lock方法:
再比較一下公平鎖和非公平鎖lock方法:tryAcquire方法:
惟一的差異就是hasQueuedPredecessors()
判斷同步隊列是否存在結點,這就是非公平鎖與公平鎖最大的區別,即公平鎖在線程請求到來時先會判斷同步隊列是否存在結點,若是存在先執行同步隊列中的結點線程,當前線程將封裝成node加入同步隊列等待。而非公平鎖呢,當線程請求到來時,無論同步隊列是否存在線程結點,直接上去嘗試獲取同步狀態,獲取成功直接訪問共享資源,但請注意在絕大多數狀況下,非公平鎖纔是咱們理想的選擇,畢竟從效率上來講非公平鎖老是勝於公平鎖。
以上即是ReentrantLock的內部實現原理,這裏咱們簡單進行小結,重入鎖ReentrantLock,是一個基於AQS併發框架的併發控制類,其內部實現了3個類,分別是Sync、NoFairSync以及FairSync類,其中Sync繼承自AQS,實現了釋放鎖的模板方法tryRelease(int),而NoFairSync和FairSync都繼承自Sync,實現各類獲取鎖的方法tryAcquire(int)。
ReentrantLock的全部方法實現幾乎都間接調用了這3個類,所以當咱們在使用ReentrantLock時,大部分使用都是在間接調用AQS同步器中的方法。
AQS在設計時將性能優化到了極致,具體體如今同步隊列的park和unpark,初始化AQS時的懶加載,以及線程之間經過Node這樣的數據結構從而避免線程間通信形成的額外開銷,這種由釋放鎖的線程主動喚醒後續線程的方式也是咱們再實際過程當中能夠借鑑的。
AQS還不止於同步隊列,接下來咱們會繼續探討AQS的條件隊列