舒適提醒
- AbstractQueuedSynchronizer隊列是CLH隊列的變種,CLH隊列等待採用自旋,AQS的隊列等待採用LockSupport#park。
- Node.waitStatus表示對應線程是否應當阻塞,
- head節點是正佔有鎖的線程的,其thread值爲null,處於head後驅節點的線程纔會去tryAcquire,tryAcquire由子類實現。
- 入隊在tail,出隊在head
如下必需要子類實現:
/**
* exclusive mode
*/
boolean tryAcquire(int arg)
/**
* exclusive mode.
*/
boolean tryRelease(int arg)
/**
* shared mode.
*/
int tryAcquireShared(int arg)
/**
* shared mode.
*/
boolean tryReleaseShared(int arg)
/**
* Returns true if synchronization is held exclusively with
* respect to the current (calling) thread.
*/
boolean isHeldExclusively()
獨佔模式acquire
private Node enq(final Node node) {
// 無限循環,即step 1返回false(有別的線程將它的node鏈接到tail)也會從新將node鏈接到tail
for (;;) {
Node t = tail;
if (t == null) { // new一個不帶任何狀態的Node做爲頭節點
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) { // step 1
t.next = node;
return t; // 返回當前tail
}
}
}
}
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
//-- 和#enq邏輯比,只是取消了循環,爲了更快?
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//--
enq(node);
return node; // 返回當前線程的node
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 無限循環,直到當前線程node的前驅node是head,不然對於node狀態爲SIGNAL的線程會park
for (;;) {
final Node p = node.predecessor();
// 若是當前線程node的前驅是head
if (p == head && tryAcquire(arg)) {
// head = node; 從而讓其餘線程也能走入該if
// node.thread = null; 因此head永遠是一個不帶Thread的空節點
// node.prev = null;
setHead(node);
p.next = null; // 配合上面的 node.prev = null; for GC
failed = false;
return interrupted;
}
// 判斷在tryAcquire失敗後是否應該park,如果,則執行park
if (shouldParkAfterFailedAcquire(p, node) &&
// LockSupport#park,返回Thread#interrupted
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 表示當前節點應當park
if (ws == Node.SIGNAL) return true;
// 當前節點不斷向前找,直到找到一個前驅節點waitStats不是CANCELLED的爲止(狀態值裏面只有CANCELLED是大於0的)
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
// 中間CANCELLED的node做廢
pred.next = node;
}
// 0 or PROPAGATE 須要設置爲SIGNAL,但仍然返回false,即don’t park
else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
// Acquires in exclusive mode, ignoring interrupts
public final void acquire(int arg) {
// 這裏提早tryAcquire爲了省去入隊列操做,提升性能,由於大部分狀況下可能都沒有鎖競爭
if (!tryAcquire(arg) &&
// 入隊列,返回當前線程中斷狀態
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
獨佔模式release
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0) // step 1
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
// 這裏是SIGNAL,將head的waitStatus設爲0,是爲了避免重複step 1
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
// 若是head(當前線程)無後驅node,或後驅node爲CANCELLED
if (s == null || s.waitStatus > 0) {
s = null;
// 從鏈表tail開始遍歷,取出非CANCELLED的node
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 非CANCELLED的線程unpark,繼續#acquireQueued的for循環
if (s != null)
LockSupport.unpark(s.thread);
}