AQS能夠說是JAVA源碼中必讀源碼之一。同時它也是JAVA大廠面試的高頻知識點之一。認識並瞭解它,JAVA初中升高級工程師必備知識點之一。 AQS是AbstractQueuedSynchronizer的簡稱,它也是JUC包下衆多非原生鎖實現的核心。node
AQS是基於CLH隊列算法改進實現的鎖機制。大致邏輯是AQS內部有一個鏈型隊列,隊列結點類是AQS的一個內部類Node,造成一個相似以下Sync Queue(記住這個名詞)面試
能夠看出,一個Node除了先後結點的索引外,還維護了一個Thread對象,一個int的waitStatus。 Thread對象就是處於競爭隊列中的線程對象自己。 waitStatus表示當前競爭結點的狀態,這裏暫且忽略掉。算法
處於隊首的,即Head所指向的結點,即爲獲取到鎖的結點。釋放鎖即爲出隊,後續結點則成爲隊首,即獲取到鎖bash
Tips:這裏幫你們理解一個事情,每個ReentrantLock實例都有且只有一個AQS實例,一個AQS實例維護一個Sync Queue。因此說,當咱們的業務代碼中的多個線程對同一個ReentrantLock實例進行鎖競爭操做時,其實際就是對同一個Sync Queue的隊列進行入隊、出隊操做。函數
咱們在用ReentrantLock時,代碼一般以下:ui
ReentrantLock lock = new ReentrantLock();
Runnable runnable = new Runnable() {
public void run() {
lock.lock();
Sys.out(Thread.currentThread().name() + "搶到鎖");
lock.unlock();
}
};
Thread t1 = new Thread(runnable);
Thread t2 = new Thread(runnable);
t2.start();
t1.start();
複製代碼
可見線程t1,t2競爭lock這個鎖。 lock.lock()
搶鎖 lock.unlock()
釋放鎖 來看入口函數lock
this
public void lock() {
sync.lock();
}
複製代碼
sync
是ReentrantLock
內的一個繼承了AQS
的抽象類spa
abstract static class Sync extends AbstractQueuedSynchronizer {
}
複製代碼
抽象類的具體實現是另兩個內部類NonFairSync
FairSync
,分別表明非公平鎖、公平鎖。線程
咱們系統來看下繼承圖 code
ReentrantLock中的sync實例,默認是在構造函數中初始化的,
public ReentrantLock() {
sync = new NonfairSync();
}
複製代碼
那咱們就看默認的NonFairSync
的實現邏輯。
當咱們調用ReentrantLock.lock()
時,直接調用到的是NonFairSync.lock()
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
複製代碼
compareAndSetState(0, 1)
經過CAS(CAS是啥,這裏不講,參考樂觀鎖。具體Google吧)方式,設置AQS的int state
字段爲1。AQS內部就是經過這個state
是否爲0來判斷當前鎖是否已經被線程獲取到。 返回true
,則說明獲取鎖成功,設置當前鎖的獨佔線程setExclusiveOwnerThreaThread.currentThread());
不然,acquire(1)
嘗試獲d(取鎖。這個方法會自旋、阻塞,一直到獲取鎖成功爲止。這裏,傳進去的參數1,就參考樂觀鎖的版本字段,同時,這裏,它還記錄了可重入鎖重複獲取到鎖的次數。只有釋放一樣次數才能最終釋放鎖。
具體看AQS的acquire()
的邏輯
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
複製代碼
要注意,這個方法是忽略線程中斷的。 先看tryAcquire(arg)
,這個方法是AQS留給子實現類的口子,具體實現看NonFairSync
,它的實現裏直接調用了Sync.nonfairTryAcquire()
,
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
複製代碼
能夠看到方法內,先是嘗試獲取state = 0
時的初始鎖,若是失敗,判斷當前鎖是不是被當前線程獲取,是的話,將acquire
時傳入的參數累加到state
字段上。在這裏,這個字段就是用來記錄重複獲取鎖的次數。 獲取失敗則返回false
回到acquire
在獲取失敗,返回false
後,纔會繼續調用 &&
右邊的acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
方法。 先看addWaiter
private Node addWaiter(Node mode) {
1: Node node = new Node(Thread.currentThread(), mode);
2: // Try the fast path of enq; backup to full enq on failure
3: Node pred = tail;
4: if (pred != null) {
5: node.prev = pred;
6: if (compareAndSetTail(pred, node)) {
7: pred.next = node;
8: return node;
9: }
10: }
11: enq(node);
12: return node;
13: }
複製代碼
利用傳進來的Node.EXCLUSIVE
表示的排斥鎖參數以及當前線程實例初始化新Node
,第4-10
行代碼是在Sync Queue
隊列內有競爭線程時進入。爲空時會走到enq(node)
,這裏是在競爭爲空時將競爭線程入隊的操做。 而後返回當前競爭的線程node
此時Sync Queue
如圖
返回acquire
接着看 addWaiter
返回的node
直接做爲參數給了acquireQueued
,這個方法就是主要的node
競爭鎖方法。
final boolean acquireQueued(final Node node, int arg) {
// 獲取鎖成功失敗標記
boolean failed = true;
try {
// 當前競爭線程的中斷標記
boolean interrupted = false;
// 自旋競爭鎖,競爭不到鎖的話,線程又沒有中斷
// 則一直在這兒循環
for (;;) {
// 獲取當前線程的前驅結點
final Node p = node.predecessor();
// 若是前驅結點是頭結點,則嘗試去tryAcquire
// 這個邏輯咱們以前看過,當前線程未獲取到鎖的狀況下
// 在AQS的state字段不爲0時,則返回false
if (p == head && tryAcquire(arg)) {
// 進入到這裏,說明要不就是當前線程在重複獲取
// 要不就是前邊的結點釋放鎖,state 歸0,這裏獲取到
setHead(node);
p.next = null; // help GC
// 標識競爭鎖成功
failed = false;
// 這個方法不響應線程中斷,可是會返回線程在競爭鎖過程當中
// 中斷標記返回
return interrupted;
}
// 若獲取鎖失敗,則到這裏。這裏的邏輯主要在If判斷
// 的兩個方法中,用來將當前線程掛起的,具體邏輯
// 看下面
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
複製代碼
park就是停下的意思。因此這個方法從名字上也比較好理解,就是 掛起線程而且檢查線程的中斷狀態。這裏要注意,LockSupport.part(this)
方法是會在線程中斷時自動喚醒的
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
複製代碼
這個方法傳入了當前競爭結點及其前驅結點
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 前驅結點的等待狀態。這裏只須要記住,咱們這裏考慮的是
// 非共享鎖、非公平鎖的AQS。因此,只須要確保當前競爭
// 結點的前驅結點狀態爲SIGNAL就好。剩下的狀態,
// 與咱們此時研究的狀況而言沒有用
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ // 若是前驅結點的status爲0,則將其改成SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } 複製代碼
從shouldParkAfterFailedAcquire
能夠看出來,在確保前驅結點status爲SIGNAL
時,就能夠放心的去unsafe.park()
了。之因此要爲SIGNAL
,是由於這個狀態含義爲:當前結點OVER時要喚醒後繼結點。
因此不難推出,咱們的結點如今就park
在那了。等他前驅結點釋放鎖,或者本身interrupt來喚醒,但由於這個方法是無視中斷的,因此即便interrupt了,只是設置了一個標記位,但仍然在循環中。
這裏假設前驅結點獲取鎖後釋放,則當前結點在parkAndCheckInterrupt()
方法中被喚醒,然後再次循環for(;;)
,此次會在第一個if
中就進入,當前結點獲取到鎖,而後重置Head
指向的結點等,返回當前線程的中斷標記。
返回acquire
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
複製代碼
if
中的selfInterrupt()
方法只是去從新設置當前線程的中斷標記位。這是由於獲取線程中斷狀態的方法,在返回狀態字段的同時,也會重置字段,因此須要標記後從新設置相應的值。
下面咱們看下AQS釋放鎖的接口方法
public void unlock() {
sync.release(1);
}
複製代碼
追進去
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
複製代碼
tryRelease
是在NonFairLock
中的實現的,若是是釋放成功,則在Head
存在而且狀態不爲0(其實能夠理解爲值爲SIGNAL
時)去喚醒Head
的後繼結點。
下面看下NonFairLock
的tryRelease
方法的實現
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
複製代碼
能夠看出,這個tryRelease
其實就是去判斷下是否是當前線程擁有鎖,是的話,判斷下當前的釋放鎖是否徹底釋放,由於鎖能夠重複獲取,徹底釋放的話,就設置state
爲0,表明AQS的鎖已經被釋放了。