那麼對象如何與監視器關聯呢,在 Java 中,對象包含三塊:對象頭、實例數據、填充數據,其中對象頭中就包含 Mark Word,Mark Word 通常存儲對象的 hashCode、GC分代年齡以及鎖信息,鎖信息就包含指向互斥量(重量級鎖)的指針,指向了一個監視器;監視器是經過 ObjectMonitor 來實現的,代碼以下:多線程
while (synchronization state does not allow acquire) {
enqueue current thread if not already queued;
possibly block current thread;
}
dequeue current thread if it was queued;
複製代碼
release 操做以下:
update synchronization state;
if (state may permit a blocked thread to acquire)
unblock one or more queued threads;
複製代碼
爲了實現上述 acquire、release 操做,須要完成如下三個功能:
同步狀態的原子性管理;
線程的阻塞與喚醒;
排隊機制;
同步狀態
AQS 類經過使用單個 int 類型來保存同步狀態,並提供了 getState()、 setState(int)、compareAndSetState(int, int) 三個方法來讀取和更新狀態,而且此同步狀態經過關鍵字 volatile 修飾,保證了多線程環境下的可見性,compareAndSetState(int, int) 是經過 CAS(Compare and swap,比較並交換) 實現的,當多個線程同時對某個資源進行 CAS 操做的時候,只能有一個線程操做成功,但並不會阻塞其餘線程,其餘線程會收到操做失敗的信號,CAS 是一個輕量級的樂觀鎖。CAS 的底層經過 Unsafe 類實現的,利用處理器提供的 CMPXCHG 指令實現其原子性,使得僅當同步器狀態爲一個指望值的時候,纔會被原子的更新成目標值,相比 synchronized 不會致使過多的上下文切換切換和掛起線程。在 java.util.concurrent 包中,大量地使用 CAS 來實現原子性。
AQS 同步隊列是一個 FIFO 隊列,在此同步隊列中,一個節點表示一個線程,它保存着線程的引用、狀態、前驅節點、後繼節點。同步隊列經過兩個節點 tail 和 head 來存取,初始化時,tail、head 初始化爲一個空節點,線程要加入到同步隊列中,經過 CAS 原子地拼接爲新的 tail 節點,線程要退出隊列,只需設置 head 節點指向當前線程節點。
同步隊列的優勢在於其出隊和入隊的操做都是無鎖的、快速的。爲了將 CLH 鎖隊列用於阻塞同步器,該同步隊列須要作些額外的修改以提供一種高效的方式定位某個節點的後繼節點,在自旋鎖中,一個節點只需改變其狀態,下一次自旋中其後繼節點就能注意到這個改變。可是在阻塞式同步器中,一個節點須要顯示地喚醒其後繼節點。同步隊列包含一個 next 連接到它的後繼節點。第二個對 CLH 鎖隊列主要的修改是將每一個節點都有的狀態字段用於控制阻塞而非自旋。
基本的 acquire 操做的最終實現通常形式(互斥、非中斷、無超時):
if(!tryAcquire(arg)) {
node = create and enqueue new node;
pred = node's effective predecessor; while (pred is not head node || !tryAcquire(arg)) { if (pred's signal bit is set)
park();
else
compareAndSet pred's signal bit to true; pred = node's effective predecessor;
}
head = node;
}
複製代碼
release 操做以下:
if(tryRelease(arg) && head node's signal bit is set) { compareAndSet head's bit to false;
unpark head's successor, if one exist } 複製代碼
若是同步狀態的佔用線程爲當前線程,則爲重入性,將當前 state 加 1,不然返回爲 false。緊接着會調用 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 方法,addWaiter(Node.EXCLUSIVE) 方法的源代碼以下:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
複製代碼
上述方法在獲取同步狀態失敗的基礎上,建立獨佔模式的節點,若是尾節點不爲空,則將新節點的 prev 指向當前最後一個節點,當前的最後一個節點的 next 指向當前節點,而後 tail 指向當前節點。若是調用失敗,則調用 enq() 方法設置尾節點。
在隊列中加入節點成功後,接下來會調用 acquireQueued(Node,int) 方法,自旋的獲取同步狀態,固然前提是當前節點的 prev 節點是 head 節點才能獲取同步狀態,調用 tryAcquire(int) 方法獲取同步狀態,若是成功獲取到,並將當前節點設置爲 head 節點,並將當前節點 prev 設置爲 null(由於按照 FIFO 的同步隊列原則,由於是非公平鎖的模式,因此即便是第一個節點也有可能被新來的線程搶佔到)。
若是獲取同步狀態失敗,則調用 shouldParkAfterFailedAcquire(node,node) ,則根據條件判斷是否應該阻塞,防止 CPU 處於忙碌狀態,會浪費 CPU 資源,若是 prev 節點狀態是 SIGNAL 則阻塞,若是是 CANCELLED 狀態則向前遍歷,移除前面全部爲該狀態的節點 ,此方法裏面阻塞當前節點用的方法是 LockSupport.park()。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
returntrue;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } 複製代碼
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
returntrue;
}
}
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
複製代碼
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
複製代碼