想研究併發編程快一年了,一直斷斷續續,沒有重點,無所收穫。寫此文,以明志!java
LockSupport,之後要重點看一下,在parkAndCheckInterrupt用到node
conditionObject doReleaseShared編程
本文主要講獨佔、共享地獲取釋放鎖,condition沒講安全
先從數據結構開始,Node與Condition。源碼是最好的老師數據結構
static final class Node { /** Marker to indicate a node is waiting in shared mode */ static final Node SHARED = new Node();//節點處於共享模式的標誌,即便更改屬性,引用不變。要的只是引用——線程安全 /** Marker to indicate a node is waiting in exclusive mode */ static final Node EXCLUSIVE = null;//節點處於獨佔模式的標誌 /** waitStatus value to indicate thread has cancelled */ static final int CANCELLED = 1;//等待狀態:線程取消了獲取鎖(獨佔\共享)的操做 /** waitStatus value to indicate successor's thread needs unparking */ static final int SIGNAL = -1;//後繼(不必定是下一個)節點的線程須要unparking /** waitStatus value to indicate thread is waiting on condition */ static final int CONDITION = -2;//線程在條件隊列等待 /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ static final int PROPAGATE = -3;//獲取鎖的操做須要無條件傳播(共享模式下),表示當前場景下後續的acquireShared可以得以執行 /** * Status field, taking on only the values: * SIGNAL: The successor of this node is (or will soon be) * blocked (via park), so the current node must * unpark its successor when it releases or * cancels. To avoid races, acquire methods must * first indicate they need a signal, * then retry the atomic acquire, and then, * on failure, block.//只有當前狀態爲signal,才能獲取鎖,否則release\cancel會「忘了」unpark後面的線程 * CANCELLED: This node is cancelled due to timeout or interrupt. * Nodes never leave this state. In particular, * a thread with cancelled node never again blocks. * CONDITION: This node is currently on a condition queue. * It will not be used as a sync queue node * until transferred, at which time the status * will be set to 0. (Use of this value here has * nothing to do with the other uses of the * field, but simplifies mechanics.)//代表節點在條件隊列。當條件隊列轉爲同步隊列時,狀態位置0(條件隊列、同步隊列能夠參考wait/notify機制) * PROPAGATE: A releaseShared should be propagated to other * nodes. This is set (for head node only) in * doReleaseShared to ensure propagation * continues, even if other operations have * since intervened.//releaseShared後,狀態傳播(只有頭結點會) * 0: None of the above //當前節點在sync隊列中,等待着獲取鎖。 * * The values are arranged numerically to simplify use. * Non-negative values mean that a node doesn't need to * signal. So, most code doesn't need to check for particular * values, just for sign.//非負數表示當前節點不須要喚醒後繼節點\傳播狀態 * * The field is initialized to 0 for normal sync nodes, and * CONDITION for condition nodes. It is modified using CAS * (or when possible, unconditional volatile writes). */ volatile int waitStatus;//等待狀態是volatile,保證多線程間的可見性 /** * Link to predecessor node that current node/thread relies on * for checking waitStatus. Assigned during enqueuing, and nulled * out (for sake of GC) only upon dequeuing. Also, upon * cancellation of a predecessor, we short-circuit while * finding a non-cancelled one, which will always exist * because the head node is never cancelled: A node becomes * head only as a result of successful acquire. A * cancelled thread never succeeds in acquiring, and a thread only * cancels itself, not any other node. */ volatile Node prev;//前驅節點,在出入queue時賦值,非cancel節點。必定存在,由於頭結點非cancel /** * Link to the successor node that the current node/thread * unparks upon release. Assigned during enqueuing, adjusted * when bypassing cancelled predecessors, and nulled out (for * sake of GC) when dequeued. The enq operation does not * assign next field of a predecessor until after attachment, * so seeing a null next field does not necessarily mean that * node is at end of queue. However, if a next field appears * to be null, we can scan prev's from the tail to * double-check. The next field of cancelled nodes is set to * point to the node itself instead of null, to make life * easier for isOnSyncQueue. */ volatile Node next;//後繼節點,釋放鎖時直接鏈到後繼節點。入隊時賦值,前驅節點 取消 時更改,出隊時爲null。 //enq在入隊時賦值,爲null時不必定爲tail。cancel節點的next爲自身,以方便isOnSyncQueue /** * The thread that enqueued this node. Initialized on * construction and nulled out after use. */ volatile Thread thread;//使節點入隊的線程,構造時初始化,use後爲null /** * Link to next node waiting on condition, or the special * value SHARED. Because condition queues are accessed only * when holding in exclusive mode, we just need a simple * linked queue to hold nodes while they are waiting on * conditions. They are then transferred to the queue to * re-acquire. And because conditions can only be exclusive, * we save a field by using special value to indicate shared * mode. */ Node nextWaiter;//條件隊列裏的next,或SHARED.條件隊列只會被獨佔模式訪問,因此只須要簡單的linkedQueue維護(由於線程安全) //也用它表示共享模式 /** * Returns true if node is waiting in shared mode. */ final boolean isShared() { return nextWaiter == SHARED; } /** * Returns previous node, or throws NullPointerException if null. * Use when predecessor cannot be null. The null check could * be elided, but is present to help the VM. * * @return the predecessor of this node */ final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode;//獨佔模式、共享模式、條件隊列的next節點。入隊使用 this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition,如條件隊列時使用 this.waitStatus = waitStatus; this.thread = thread; } }
Node主要是由thread,waitStatus組成的數據結構,多個Node組成一個同步隊列<acquire,release>,或條件隊列<wait,notify>多線程
關於ConditionObject,我之後好好研究一下再講併發
/** * Condition implementation for a {@link * AbstractQueuedSynchronizer} serving as the basis of a {@link * Lock} implementation. * * <p>Method documentation for this class describes mechanics, * not behavioral specifications from the point of view of Lock * and Condition users. Exported versions of this class will in * general need to be accompanied by documentation describing * condition semantics that rely on those of the associated * {@code AbstractQueuedSynchronizer}. * * <p>This class is Serializable, but all fields are transient, * so deserialized conditions have no waiters. */ public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; /** First node of condition queue. */ private transient Node firstWaiter; /** Last node of condition queue. */ private transient Node lastWaiter; /** * Creates a new {@code ConditionObject} instance. */ public ConditionObject() { } // Internal methods /** * Adds a new waiter to wait queue. * @return its new wait node */ private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; } /** * Removes and transfers nodes until hit non-cancelled one or * null. Split out from signal in part to encourage compilers * to inline the case of no waiters. * @param first (non-null) the first node on condition queue */ private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } /** * Removes and transfers all nodes. * @param first (non-null) the first node on condition queue */ private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null); } /** * Unlinks cancelled waiter nodes from condition queue. * Called only while holding lock. This is called when * cancellation occurred during condition wait, and upon * insertion of a new waiter when lastWaiter is seen to have * been cancelled. This method is needed to avoid garbage * retention in the absence of signals. So even though it may * require a full traversal, it comes into play only when * timeouts or cancellations occur in the absence of * signals. It traverses all nodes rather than stopping at a * particular target to unlink all pointers to garbage nodes * without requiring many re-traversals during cancellation * storms. */ private void unlinkCancelledWaiters() { Node t = firstWaiter; Node trail = null; while (t != null) { Node next = t.nextWaiter; if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; t = next; } } // public methods /** * Moves the longest-waiting thread, if one exists, from the * wait queue for this condition to the wait queue for the * owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } /** * Moves all threads from the wait queue for this condition to * the wait queue for the owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ public final void signalAll() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); } /** * Implements uninterruptible condition wait. * <ol> * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * </ol> */ public final void awaitUninterruptibly() { Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean interrupted = false; while (!isOnSyncQueue(node)) { LockSupport.park(this); if (Thread.interrupted()) interrupted = true; } if (acquireQueued(node, savedState) || interrupted) selfInterrupt(); } /* * For interruptible waits, we need to track whether to throw * InterruptedException, if interrupted while blocked on * condition, versus reinterrupt current thread, if * interrupted while blocked waiting to re-acquire. */ /** Mode meaning to reinterrupt on exit from wait */ private static final int REINTERRUPT = 1; /** Mode meaning to throw InterruptedException on exit from wait */ private static final int THROW_IE = -1; /** * Checks for interrupt, returning THROW_IE if interrupted * before signalled, REINTERRUPT if after signalled, or * 0 if not interrupted. */ private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; } /** * Throws InterruptedException, reinterrupts current thread, or * does nothing, depending on mode. */ private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) throw new InterruptedException(); else if (interruptMode == REINTERRUPT) selfInterrupt(); } /** * Implements interruptible condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled or interrupted. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * </ol> */ public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } /** * Implements timed condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled, interrupted, or timed out. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * </ol> */ public final long awaitNanos(long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); final long deadline = System.nanoTime() + nanosTimeout; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { transferAfterCancelledWait(node); break; } if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return deadline - System.nanoTime(); } /** * Implements absolute timed condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled, interrupted, or timed out. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * <li> If timed out while blocked in step 4, return false, else true. * </ol> */ public final boolean awaitUntil(Date deadline) throws InterruptedException { long abstime = deadline.getTime(); if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean timedout = false; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (System.currentTimeMillis() > abstime) { timedout = transferAfterCancelledWait(node); break; } LockSupport.parkUntil(this, abstime); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return !timedout; } /** * Implements timed condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled, interrupted, or timed out. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * <li> If timed out while blocked in step 4, return false, else true. * </ol> */ public final boolean await(long time, TimeUnit unit) throws InterruptedException { long nanosTimeout = unit.toNanos(time); if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); final long deadline = System.nanoTime() + nanosTimeout; boolean timedout = false; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { timedout = transferAfterCancelledWait(node); break; } if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return !timedout; } // support for instrumentation /** * Returns true if this condition was created by the given * synchronization object. * * @return {@code true} if owned */ final boolean isOwnedBy(AbstractQueuedSynchronizer sync) { return sync == AbstractQueuedSynchronizer.this; } /** * Queries whether any threads are waiting on this condition. * Implements {@link AbstractQueuedSynchronizer#hasWaiters(ConditionObject)}. * * @return {@code true} if there are any waiting threads * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ protected final boolean hasWaiters() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); for (Node w = firstWaiter; w != null; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) return true; } return false; } /** * Returns an estimate of the number of threads waiting on * this condition. * Implements {@link AbstractQueuedSynchronizer#getWaitQueueLength(ConditionObject)}. * * @return the estimated number of waiting threads * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ protected final int getWaitQueueLength() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int n = 0; for (Node w = firstWaiter; w != null; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) ++n; } return n; } /** * Returns a collection containing those threads that may be * waiting on this Condition. * Implements {@link AbstractQueuedSynchronizer#getWaitingThreads(ConditionObject)}. * * @return the collection of threads * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ protected final Collection<Thread> getWaitingThreads() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); ArrayList<Thread> list = new ArrayList<Thread>(); for (Node w = firstWaiter; w != null; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) { Thread t = w.thread; if (t != null) list.add(t); } } return list; } }
/** * Head of the wait queue, lazily initialized. Except for * initialization, it is modified only via method setHead. Note: * If head exists, its waitStatus is guaranteed not to be * CANCELLED. * 等待隊列的head,懶加載。除了初始化,只會被setHead修改。若是存在head,他的waiitStatus絕對不是canceled */ private transient volatile Node head; /** * Tail of the wait queue, lazily initialized. Modified only via * method enq to add new wait node. * 等待隊列的tali,懶加載。只會被enq修改 */ private transient volatile Node tail; /** * The synchronization state.同步器的狀態 */ private volatile int state;
下面重點講方法app
如下5個protected方法,不是傳統的abstract方法,是爲了讓子類選擇性實現,如:writeLock只須要實現tryAcquire/tryRelease這種獨佔鎖的方法,readLock只須要實現tryAcquireShared/tryReleaseShared這種共享鎖的方法。ide
// Main exported methods 主要暴露出去的方法,給了子類實現上的靈活性 /** * Attempts to acquire in exclusive mode. This method should query * if the state of the object permits it to be acquired in the * exclusive mode, and if so to acquire it. * 以獨佔的模式獲取鎖。須要查看 狀態 來決定是否能獲取鎖 * <p>This method is always invoked by the thread performing * acquire. If this method reports failure, the acquire method * may queue the thread, if it is not already queued, until it is * signalled by a release from some other thread. This can be used * to implement method {@link Lock#tryLock()}. * 被嘗試獲取鎖的線程調用,會進入\停在隊列,直到被其餘線程release後signal。該方法能夠實現Lock.tryLock * <p>The default * implementation throws {@link UnsupportedOperationException}. * * @param arg the acquire argument. This value is always the one * passed to an acquire method, or is the value saved on entry * to a condition wait. The value is otherwise uninterpreted * and can represent anything you like. * @return {@code true} if successful. Upon success, this object has * been acquired. * @throws IllegalMonitorStateException if acquiring would place this * synchronizer in an illegal state. This exception must be * thrown in a consistent fashion for synchronization to work * correctly. * @throws UnsupportedOperationException if exclusive mode is not supported */ protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } /** * Attempts to set the state to reflect a release in exclusive * mode. * * <p>This method is always invoked by the thread performing release. * * <p>The default implementation throws * {@link UnsupportedOperationException}. * * @param arg the release argument. This value is always the one * passed to a release method, or the current state value upon * entry to a condition wait. The value is otherwise * uninterpreted and can represent anything you like. * @return {@code true} if this object is now in a fully released * state, so that any waiting threads may attempt to acquire; * and {@code false} otherwise. * @throws IllegalMonitorStateException if releasing would place this * synchronizer in an illegal state. This exception must be * thrown in a consistent fashion for synchronization to work * correctly. * @throws UnsupportedOperationException if exclusive mode is not supported */ protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } /** * Attempts to acquire in shared mode. This method should query if * the state of the object permits it to be acquired in the shared * mode, and if so to acquire it. * * <p>This method is always invoked by the thread performing * acquire. If this method reports failure, the acquire method * may queue the thread, if it is not already queued, until it is * signalled by a release from some other thread. * * <p>The default implementation throws {@link * UnsupportedOperationException}. * * @param arg the acquire argument. This value is always the one * passed to an acquire method, or is the value saved on entry * to a condition wait. The value is otherwise uninterpreted * and can represent anything you like. * @return a negative value on failure; zero if acquisition in shared * mode succeeded but no subsequent shared-mode acquire can * succeed; and a positive value if acquisition in shared * mode succeeded and subsequent shared-mode acquires might * also succeed, in which case a subsequent waiting thread * must check availability. (Support for three different * return values enables this method to be used in contexts * where acquires only sometimes act exclusively.) Upon * success, this object has been acquired. * @throws IllegalMonitorStateException if acquiring would place this * synchronizer in an illegal state. This exception must be * thrown in a consistent fashion for synchronization to work * correctly. * @throws UnsupportedOperationException if shared mode is not supported */ protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); } /** * Attempts to set the state to reflect a release in shared mode. * * <p>This method is always invoked by the thread performing release. * * <p>The default implementation throws * {@link UnsupportedOperationException}. * * @param arg the release argument. This value is always the one * passed to a release method, or the current state value upon * entry to a condition wait. The value is otherwise * uninterpreted and can represent anything you like. * @return {@code true} if this release of shared mode may permit a * waiting acquire (shared or exclusive) to succeed; and * {@code false} otherwise * @throws IllegalMonitorStateException if releasing would place this * synchronizer in an illegal state. This exception must be * thrown in a consistent fashion for synchronization to work * correctly. * @throws UnsupportedOperationException if shared mode is not supported */ protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); } /** * Returns {@code true} if synchronization is held exclusively with * respect to the current (calling) thread. This method is invoked * upon each call to a non-waiting {@link ConditionObject} method. * (Waiting methods instead invoke {@link #release}.) * * <p>The default implementation throws {@link * UnsupportedOperationException}. This method is invoked * internally only within {@link ConditionObject} methods, so need * not be defined if conditions are not used. * * @return {@code true} if synchronization is held exclusively; * {@code false} otherwise * @throws UnsupportedOperationException if conditions are not supported */ protected boolean isHeldExclusively() { throw new UnsupportedOperationException(); }
// Queuing utilities /** * The number of nanoseconds for which it is faster to spin * rather than to use timed park. A rough estimate suffices * to improve responsiveness with very short timeouts. */ static final long spinForTimeoutThreshold = 1000L;//自旋閾值 /** * Acquires in exclusive mode, ignoring interrupts. Implemented * by invoking at least once {@link #tryAcquire}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquire} until success. This method can be used * to implement method {@link Lock#lock}. * 獨佔模式獲取鎖,不響應 中斷 。至少調用一次tryAcquire,若是返回true,結束。不然,線程入隊,在blocking-unblocking * 之間徘徊,直到tryAcquire成功。 * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. */ public final void acquire(int arg) {//tryAcquire由子類繼承 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } /** * Creates and enqueues node for current thread and given mode. * 爲當前線程以給定的模式(獨佔、共享)建立node,並enqueue,入隊列 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */ private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode);//構造新節點,有獨佔、共享,兩種模式 // Try the fast path of enq; backup to full enq on failure,fast-fail機制,先部分調用enq,fail後full調用enq Node pred = tail;//獲取tail節點 if (pred != null) {//若是存在 node.prev = pred;//prev,volatile變量 if (compareAndSetTail(pred, node)) {//以cas方式(保證線程安全)設置尾節點,成功 pred.next = node;//設置兩個節點的雙向關係 node1<=>node2 return node; } } enq(node); return node; } /** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node's predecessor */ private Node enq(final Node node) { for (;;) {//自旋 Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) {//須要for自旋與cas操做 保證線程安全 t.next = node; return t; } } } } /** * Acquires in exclusive uninterruptible mode for thread already in * queue. Used by condition wait methods as well as acquire. * 在獨佔模式中不可中斷地獲取鎖。也可用在條件隊列中 * @param node the node * @param arg the acquire argument * @return {@code true} if interrupted while waiting */ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) {//能夠看到,這裏是一個自旋,acquire方法會「重複」在這裏,直到獲取鎖成功 final Node p = node.predecessor();//node.prev if (p == head && tryAcquire(arg)) {//若是node的前驅節點時頭結點,且獲取鎖成功 setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } /** * Checks and updates status for a node that failed to acquire. * Returns true if thread should block. This is the main signal * control in all acquire loops. Requires that pred == node.prev. * 檢查並更新status of node that fails to acquire. * @param pred node's predecessor holding status * @param node the node * @return {@code true} if thread should block */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus;//make sure that pred is node.prev : ws是在for裏循環調用的,不用擔憂如今的值立刻會被修改 if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. * node的前驅節點時signal,當pred作release操做時,會unpark node,因此node能夠安全地park */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. * 前驅節點已經取消,prev往前移 */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. * waitStatus must be 0 or PROPAGATE.但不用當即park線程,在for裏再一次嘗試獲取鎖 * 能夠發現這個cas沒有直接包含在for循環裏(調用shouldParkAfterFailedAcquire會自旋),不在意一次的成敗 */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } /** * Convenience method to interrupt current thread. */ static void selfInterrupt() { Thread.currentThread().interrupt(); } /** * Convenience method to park and then check if interrupted * * @return {@code true} if interrupted */ private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
LockSupport,之後要重點看一下oop
總結一下:先tryAcquire,成功則結束。失敗,則先將當前線程包裝成獨佔節點node,並嘗試將node設爲tail,從而加入等待隊列(由於是多線程操做,直接加入等待隊列會有問題,經過單次地成爲tail,保證線程安全),若是沒設成tail(多線程的複雜性),在for循環裏設成tail。當加入隊列後,在for循環裏獲取鎖(本身成爲頭結點),若是沒有獲取鎖,根據實際狀況決定park,或者繼續嘗試獲取鎖。
/** * Releases in exclusive mode. Implemented by unblocking one or * more threads if {@link #tryRelease} returns true. * This method can be used to implement method {@link Lock#unlock}. * * @param arg the release argument. This value is conveyed to * {@link #tryRelease} but is otherwise uninterpreted and * can represent anything you like. * @return the value returned from {@link #tryRelease} */ public final boolean release(int arg) { if (tryRelease(arg)) {//if tryRelease success, Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } /** * Wakes up node's successor, if one exists. * 喚醒後繼節點 * @param node the node */ private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0);//不在for循環裏,不要求成功 /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) {//waitStatus>0 it has canceled s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread);//喚醒在acquire裏park住的線程 }
若是tryRelease成功,喚醒後繼節點(unpark在acuireQueue裏park住的線程,讓其進行下一次的for循環,final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) )。
/** * Acquires in shared mode, ignoring interrupts. Implemented by * first invoking at least once {@link #tryAcquireShared}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquireShared} until success. * 共享模式獲取鎖,不響應中斷。失敗後,進入隊列,在blocking\unblocking之間徘徊,直到成功 * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquireShared} but is otherwise uninterpreted * and can represent anything you like. */ public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } /** * Acquires in shared uninterruptible mode. do...實際執行體 * @param arg the acquire argument */ private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED);//建立共享節點,塞入隊列尾部 boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg);//tryAcquire - boolean tryAcquireShared - int if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } /** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */ private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } /** * Sets head of queue, and checks if successor may be waiting * in shared mode, if so propagating if either propagate > 0 or * PROPAGATE status was set. * * @param node the node * @param propagate the return value from a tryAcquireShared */ private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below 老head setHead(node); /* * Try to signal next queued node if: * Propagation was indicated by caller,//調用者明示:傳播 * or was recorded (as h.waitStatus either before * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don't know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */ if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } } /** * Returns true if node is waiting in shared mode. */ final boolean isShared() { return nextWaiter == SHARED; } /** * Release action for shared mode -- signals successor and ensures * propagation. (Note: For exclusive mode, release just amounts * to calling unparkSuccessor of head if it needs signal.) * 共享模式釋放鎖 signal後繼,並傳播。獨佔模式只signal後繼 */ private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) {// if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); //only signal 會喚醒後繼 } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed 暗示:其餘線程獲取鎖(setHead) break; //此期間其他線程沒有獲取鎖,纔會結束 } } /** * Wakes up node's successor, if one exists. * * @param node the node */ private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }