AbstractQueuedSynchronizer
先分析下同步器AbstractQueuedSynchronizer
,這個是用於鎖實現的類,ReentrantLock就用到了它java
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
ReentrantLock
的lock()
方法就是調用acquire(int arg)
去作事情的。
其中protected boolean tryAcquire(int arg)
是留給子類去實現的,因此這裏是採用了模板設計模式。這個方法簡單來講就是去獲取鎖。node
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
addWaiter
方法簡單來講是構造節點,而後用CAS
的方式把這個節點加入同步器中節點鏈表的尾巴。設計模式
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); //若是前驅結點是頭節點的話,那麼嘗試獲取鎖(也就是同步狀態) if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } //若是失敗的話 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
acquireQueued()
方法中,當前線程在「死循環」中嘗試獲取同步狀態,而且只有前驅節點是頭節點纔可以嘗試獲取同步狀態。安全
/* * 查詢是否有線程比當前線程更早地請求獲取鎖 */ public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; //若是h==t,返回false,表示沒有。h==t的時候要麼沒有等待者,要麼只有一個。當只有一個的時候, //那麼這一個節點確定是首節點,而首節點中的線程一定是獲取了鎖的。 // h!=t&&((s = h.next) == null || s.thread != Thread.currentThread()) //若是h!=t,則進入後面的判斷 //當頭節點的後繼節點爲null,可是這個時候tail爲null(head和tail節點都是懶初始 //化),或者頭節點的後繼節點不爲null,可是頭節點的後繼節點中的線程不是當前線程,則返回true return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
若是等待隊列爲空,或者只有首節點,或者首節點的後繼節點中的線程是當前線程,那麼當前線程就能夠去獲取同步狀態less
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ /* *若是這個節點的waitStatus已經被標記爲SIGNAL(-1)了的話,那麼 *這個節點就須要park,因此方法返回true */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ /* * 該節點的前驅節點若是被取消了(waitStatus爲1時表示取消狀態,目前只有這個狀態的值大於 * 0),那麼跳過前驅節點(經過死循環的方式把前驅結點的前驅結點設置爲本身的前驅結點)。然 * 後退出if條件語句,調到末尾,返回false,表示本身還能夠搶救一下,能夠進行獲取的鎖的嘗 * 試,而不是park。 * */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ /* * 若是以上狀況都不成立的話,那麼就把本身的waitStatus標記爲SIGNAL,返回true,表示本身要 * park,戰略性投降。 */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
ReentrantLock
public class ReentrantLock implements Lock, java.io.Serializable { ..... abstract static class Sync extends AbstractQueuedSynchronizer /** public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } ..... }
ConditionObject
await()
/** * Implements interruptible condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled or interrupted. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * </ol> */ public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); long savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { //線程將會在調用park方法後阻塞,直到被從新喚醒,從condition隊列加入同步隊列,從await()方 //法中的while循環中退出(isOnSyncQueue(Node node)方法返回true,節點已經在同步隊列中), //進而調用同步器的acquireQueued()方法加入到獲取同步狀態的競爭中。 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); //成功獲取同步狀態(或者說鎖)以後,被喚醒的線程將從先前調用的await()方法返回,此時該線程已 //經成功地獲取了鎖。 } /** * Invokes release with current state value; returns saved state. * Cancels node and throws exception on failure. * @param node the condition node for this wait * @return previous sync state */ final long fullyRelease(Node node) { boolean failed = true; try { long savedState = getState(); if (release(savedState)) { failed = false; return savedState; } else { //若是釋放同步狀態失敗,就throws exception,在finally那裏還會Cancels node throw new IllegalMonitorStateException(); } } finally { //若是釋放同步狀態失敗,就Cancels node if (failed) node.waitStatus = Node.CANCELLED; } }
signal()
public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); }
Condition
的signal()
方法,將會喚醒在等待隊列中等待時間最長的節點(首節點),在喚醒節點以前,會將節點移到同步隊列中。signal()
方法進行了isHeldExclusively()
檢查,也就是當前線程必須是獲取了鎖的線程。接着獲取等待隊列的首節點,將其移動到同步隊列並使用LockSupport
喚醒節點中的線程。/** * Removes and transfers nodes until hit non-cancelled one or * null. Split out from signal in part to encourage compilers * to inline the case of no waiters. * @param first (non-null) the first node on condition queue */ private void doSignal(Node first) { do { //當沒有waiters時進行的一些優化,以便編譯器進行方法內聯,transferForSignal方法纔是重點 if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } /** * Transfers a node from a condition queue onto sync queue. * Returns true if successful. * @param node the node * @return true if successfully transferred (else the node was * cancelled before signal) 把節點從condition隊列「傳輸」到同步隊列去。」傳輸「成功或者在喚醒前節點已經取消,就返回true。 */ final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). * 經過調用同步器的enq(Node node)方法,等待隊列中的頭節點線程安全地移動到同步隊列。 */ Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) //當節點移動到同步隊列後,當前線程再使用LockSupport喚醒該節點的線程。 LockSupport.unpark(node.thread); return true; }