隊列同步器(AbstractQueuedSynchronizer)是構建鎖和其餘同步組件的基礎框架。看一下源碼的介紹:java
Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues.This class is designed to be a useful basis for most kinds of synchronizers that rely on a single atomic int value to represent state.node
它使用FIFO隊列來完成資源獲取線程的排隊工做,使用一個int變量來表示同步狀態。app
Subclasses must define the protected methods that change this state, and which define what that state means in terms of this object being acquired or released. Given these, the other methods in this class carry out all queuing and blocking mechanics. Subclasses can maintain other state fields, but only the atomically updated int value manipulated using methods getState, setState and compareAndSetState is tracked with respect to synchronization.框架
隊列同步器(如下簡稱AQS)使用以下三個方法來訪問和修改同步狀態:ide
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
複製代碼
To use this class as the basis of a synchronizer, redefine the following methods, as applicable, by inspecting and/or modifying the synchronization state using getState, setState and/or compareAndSetState:ui
AQS可重寫的5種方法默認都拋出UnsupportedOperationException,分別爲獨佔式獲取同步狀態tryAcquire、獨佔式釋放同步狀態tryRelease、共享式獲取同步狀態tryAcquireShared、共享式釋放同步狀態tryReleaseShared、當前同步器是否在獨佔模式下被線程佔用isHeldExclusively。this
實現自定義同步組件時將會調用同步器提供的模板方法。模板方法可分爲三類:獨佔式獲取和釋放同步準備狀態、共享式獲取和釋放同步狀態、查詢同步隊列中的等待線程狀態。這些方法以下:atom
方法名稱 | 描述 |
---|---|
void acquire(int arg) | 獨佔式獲取同步狀態,若是當前線程獲取同步狀態成功,則由該方法返回,不然將會進入同步隊列等待。該方法將會調用重寫的tryAcquire(int arg)方法 |
void acquireInterruptibly(int arg) | 與acquire(int arg)相同,但該方法響應中斷。若是該線程被中斷,則該方法會拋出中斷異常並返回。 |
boolean tryAcquireNanos(int arg,long nanos) | 在acquireInterruptibly(int arg)基礎上增長了超時限制和返回值。 |
void acquireShared(int arg) | 共享式獲取同步狀態,若是當前線程未獲取到同步狀態,將會進入同步隊列等待。 |
void acquireSharedInterruptibly(int arg) | 與acquireShared(int arg)相同,但該方法響應中斷。 |
boolean tryAcquireSharedNanos(int arg,long nanos) | 在acquireSharedInterruptibly(int arg)基礎上增長了超時限制和返回值。 |
boolean release(int arg) | 獨佔式的釋放同步狀態。 |
boolean releaseShared(int arg) | 共享式的釋放同步狀態 |
Collection getQueuedThreads() | 獲取等待在同步隊列上的線程集合 |
當前線程獲取同步狀態失敗時,同步器會將當前線程以及等待狀態等信息構形成爲一個節點(Node)並將其加入同步隊列,同時會阻塞當前線程。Node源碼以下:spa
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
複製代碼
等待狀態狀態共有5種,分別是0、CANCELLED、SIGNAL、CONDITION、PROPAGATE。CANCELLED表示同步隊列中等待的線程等待超時或者被中斷,節點進入該狀態將再也不變化。SIGNAL:後繼節點的線程處於等待狀態,而當前節點的線程若是釋放了同步狀態或者被取消將會通知後繼節點。CONDITION:節點在等待隊列中,節點線程等待在Condition上,當其餘線程對Condition調用了signal()方法後,該節點將會從等待隊列中轉移到同步隊列中。 回到上面的獨佔式狀態獲取方法acquire(int arg):線程
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
複製代碼
上述代碼主要完成了同步狀態獲取、節點構造、加入同步隊列以及在同步隊列中自旋等待的相關工做。節點構造和加入隊列源碼以下:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
複製代碼
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
複製代碼
節點進入同步隊列以後,就進入到一個自旋的過程。每一個節點都在自省地觀察,當條件知足,獲取到了同步狀態,就能夠從這個自旋過程當中退出。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
複製代碼
當前線程獲取同步狀態並執行了相應的邏輯以後,就須要釋放同步狀態,使得後續節點可以繼續獲取同步狀態release(int arg)代碼以下:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
複製代碼
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
複製代碼