AQS框架源碼分析-AbstractQueuedSynchronizer

前言:AQS框架在J.U.C中的地位不言而喻,能夠說沒有AQS就沒有J.U.C包,可見其重要性,所以有必要對其原理進行詳細深刻的理解。java

1.AQS是什麼

在深刻AQS以前,首先咱們要搞清楚什麼是AQS。AQS全稱是AbstractQueuedSynchronizer,咱們直接查看AQS源碼的註釋。node

大體意思就是說:AQS提供了實現阻塞鎖和相關同步器並依賴先進先出(FIFO)等待隊列的框架。數據結構

AQS依賴一個原子數值做爲鎖的狀態,子類能夠有多個狀態值,只能經過原子方法區操做該值,從而保證同步。多線程

經過第一段的註釋大體總結下AQS是什麼:併發

①AQS是一個同步的基礎框架,基於一個先進先出的隊列app

②鎖機制依賴一個原子值的狀態。框架

③AQS的子類負責定義與操做這個狀態值,但必須經過AQS提供的原子操做。ide

④AQS剩餘的方法就是圍繞隊列,與線程阻塞喚醒等功能。函數

2.重要成員變量

AQS中有兩個重要的成員變量:Node和ConditionObject。oop

①Node的做用是存儲獲取鎖失敗的線程,而且維護一個CLH FIFO隊列,該隊列是會被多線程操做的,因此Node中大部分變量都是被volatile修飾,而且經過自旋和CAS進行原子性的操做。CLH的數據結構以下:

Node有一個模式的屬性:獨佔模式共享模式,獨佔模式下資源是線程獨佔的,共享模式下,資源是能夠被多個線程佔用的。

Node源碼以下:

  1 static final class Node {
  2         /** Marker to indicate a node is waiting in shared mode */
  3         static final Node SHARED = new Node();  // 共享模式   4         /** Marker to indicate a node is waiting in exclusive mode */
  5         static final Node EXCLUSIVE = null;  // 獨佔模式   6 
  7         /** waitStatus value to indicate thread has cancelled */
  8         static final int CANCELLED =  1;  // 代表線程已處於結束狀態(被取消)   9         /** waitStatus value to indicate successor's thread needs unparking */
 10         static final int SIGNAL    = -1; // 代表線程須要被喚醒  11         /** waitStatus value to indicate thread is waiting on condition */
 12         static final int CONDITION = -2; // 代表線程正處於條件隊列上,等待某一條件  13         /**
 14          * waitStatus value to indicate the next acquireShared should
 15          * unconditionally propagate
 16          */
 17         static final int PROPAGATE = -3; // 共享模式下同步狀態會被傳播  18 
 19         /**
 20          * Status field, taking on only the values:
 21          *   SIGNAL:     The successor of this node is (or will soon be)
 22          *               blocked (via park), so the current node must
 23          *               unpark its successor when it releases or
 24          *               cancels. To avoid races, acquire methods must
 25          *               first indicate they need a signal,
 26          *               then retry the atomic acquire, and then,
 27          *               on failure, block.
 28          *   CANCELLED:  This node is cancelled due to timeout or interrupt.
 29          *               Nodes never leave this state. In particular,
 30          *               a thread with cancelled node never again blocks.
 31          *   CONDITION:  This node is currently on a condition queue.
 32          *               It will not be used as a sync queue node
 33          *               until transferred, at which time the status
 34          *               will be set to 0. (Use of this value here has
 35          *               nothing to do with the other uses of the
 36          *               field, but simplifies mechanics.)
 37          *   PROPAGATE:  A releaseShared should be propagated to other
 38          *               nodes. This is set (for head node only) in
 39          *               doReleaseShared to ensure propagation
 40          *               continues, even if other operations have
 41          *               since intervened.
 42          *   0:          None of the above
 43          *
 44          * The values are arranged numerically to simplify use.
 45          * Non-negative values mean that a node doesn't need to
 46          * signal. So, most code doesn't need to check for particular
 47          * values, just for sign.
 48          *
 49          * The field is initialized to 0 for normal sync nodes, and
 50          * CONDITION for condition nodes.  It is modified using CAS
 51          * (or when possible, unconditional volatile writes).
 52          */
 53         volatile int waitStatus;
 54 
 55         /**
 56          * Link to predecessor node that current node/thread relies on
 57          * for checking waitStatus. Assigned during enqueuing, and nulled
 58          * out (for sake of GC) only upon dequeuing.  Also, upon
 59          * cancellation of a predecessor, we short-circuit while
 60          * finding a non-cancelled one, which will always exist
 61          * because the head node is never cancelled: A node becomes
 62          * head only as a result of successful acquire. A
 63          * cancelled thread never succeeds in acquiring, and a thread only
 64          * cancels itself, not any other node.
 65          */
 66         volatile Node prev;
 67 
 68         /**
 69          * Link to the successor node that the current node/thread
 70          * unparks upon release. Assigned during enqueuing, adjusted
 71          * when bypassing cancelled predecessors, and nulled out (for
 72          * sake of GC) when dequeued.  The enq operation does not
 73          * assign next field of a predecessor until after attachment,
 74          * so seeing a null next field does not necessarily mean that
 75          * node is at end of queue. However, if a next field appears
 76          * to be null, we can scan prev's from the tail to
 77          * double-check.  The next field of cancelled nodes is set to
 78          * point to the node itself instead of null, to make life
 79          * easier for isOnSyncQueue.
 80          */
 81         volatile Node next;
 82 
 83         /**
 84          * The thread that enqueued this node.  Initialized on
 85          * construction and nulled out after use.
 86          */
 87         volatile Thread thread;
 88 
 89         /**
 90          * Link to next node waiting on condition, or the special
 91          * value SHARED.  Because condition queues are accessed only
 92          * when holding in exclusive mode, we just need a simple
 93          * linked queue to hold nodes while they are waiting on
 94          * conditions. They are then transferred to the queue to
 95          * re-acquire. And because conditions can only be exclusive,
 96          * we save a field by using special value to indicate shared
 97          * mode.
 98          */
 99         Node nextWaiter;
100 
101         /**
102          * Returns true if node is waiting in shared mode.
103          */
104         final boolean isShared() {
105             return nextWaiter == SHARED;
106         }
107 
108         /**
109          * Returns previous node, or throws NullPointerException if null.
110          * Use when predecessor cannot be null.  The null check could
111          * be elided, but is present to help the VM.
112          *
113          * @return the predecessor of this node
114          */
115         final Node predecessor() throws NullPointerException {
116             Node p = prev;
117             if (p == null)
118                 throw new NullPointerException();
119             else
120                 return p;
121         }
122 
123         Node() {    // Used to establish initial head or SHARED marker
124         }
125         // 線程加入等待結點
126         Node(Thread thread, Node mode) {     // Used by addWaiter
127             this.nextWaiter = mode;
128             this.thread = thread;
129         }
130         // 線程加入條件對列,會帶上線程的狀態值waitStatus
131         Node(Thread thread, int waitStatus) { // Used by Condition
132             this.waitStatus = waitStatus;
133             this.thread = thread;
134         }
135     }

②ConditionObject:條件隊列,這個類的做用從AQS的註釋上可知。

該類主要是爲了讓子類實現獨佔模式。AQS框架下獨佔模式的獲取資源、釋放等操做到最後都是基於這個類實現的。只有在獨佔模式下才會去使用該類。

ConditionObject源碼以下(對主要代碼進行了註釋):

  1 public class ConditionObject implements Condition, java.io.Serializable {
  2         private static final long serialVersionUID = 1173984872572414699L;
  3         /** First node of condition queue. */
  4         private transient Node firstWaiter;  // 存儲條件對列中第一個節點
  5         /** Last node of condition queue. */
  6         private transient Node lastWaiter; // 存儲條件對列中最後一個節點
  7 
  8         /**
  9          * Creates a new {@code ConditionObject} instance.
 10          */
 11         public ConditionObject() { }
 12 
 13         // Internal methods
 14 
 15         /**
 16          * Adds a new waiter to wait queue.  // 增長一個新的節點到等待隊列中
 17          * @return its new wait node
 18          */
 19         private Node addConditionWaiter() {
 20             Node t = lastWaiter;
 21             // 若是最後一個節點的狀態已經結束,則直接清理掉
 22             // If lastWaiter is cancelled, clean out.
 23             if (t != null && t.waitStatus != Node.CONDITION) {
 24                // 拆分已經處於結束狀態的節點 也就是清除掉這類節點
 25                unlinkCancelledWaiters();
 26                t = lastWaiter;
 27             }
 28             // 建立一個新的節點,帶上結點狀態,代表結點處於條件對列上
 29             Node node = new Node(Thread.currentThread(), Node.CONDITION);
 30             /**
 31              條件隊列中加入節點都是從隊尾加入,而且從下面代碼可知,每次都會存儲最後一個節點的值。
 32              當最後一個節點爲空時,說明隊列中不存在節點,因此將node賦值給第一個節點,不然將節點加入對列尾
 33              */
 34             if (t == null)
 35                 firstWaiter = node;
 36             else
 37                 t.nextWaiter = node;
 38             lastWaiter = node;  // 存儲最後一個節點的值
 39             return node;
 40         }
 41 
 42         /**
 43          * 喚醒節點
 44          * 移除和轉換節點直到節點狀態處於未結束或者爲空 (節點移除至關於喚醒)
 45          * Removes and transfers nodes until hit non-cancelled one or
 46          * null. Split out from signal in part to encourage compilers
 47          * to inline the case of no waiters.
 48          * @param first (non-null) the first node on condition queue
 49          */
 50         private void doSignal(Node first) {
 51             do {
 52                 // 當next節點爲null時,則將lastWaiter賦值爲null
 53                 if ( (firstWaiter = first.nextWaiter) == null)
 54                     lastWaiter = null;
 55                 first.nextWaiter = null; // 切斷當前節點
 56             } while (!transferForSignal(first) &&
 57                      (first = firstWaiter) != null);
 58         }
 59 
 60         /**
 61          * 喚醒全部節點
 62          * Removes and transfers all nodes.
 63          * @param first (non-null) the first node on condition queue
 64          */
 65         private void doSignalAll(Node first) {
 66             lastWaiter = firstWaiter = null;
 67             do {
 68                 // 循環喚醒全部節點,代碼仍是比較容易理解
 69                 // 將每一個節點直接截斷便可
 70                 Node next = first.nextWaiter;
 71                 first.nextWaiter = null;
 72                 transferForSignal(first);
 73                 first = next;
 74             } while (first != null);
 75         }
 76 
 77         /**
 78          * Unlinks cancelled waiter nodes from condition queue.
 79          * Called only while holding lock. This is called when
 80          * cancellation occurred during condition wait, and upon
 81          * insertion of a new waiter when lastWaiter is seen to have
 82          * been cancelled. This method is needed to avoid garbage
 83          * retention in the absence of signals. So even though it may
 84          * require a full traversal, it comes into play only when
 85          * timeouts or cancellations occur in the absence of
 86          * signals. It traverses all nodes rather than stopping at a
 87          * particular target to unlink all pointers to garbage nodes
 88          * without requiring many re-traversals during cancellation
 89          * storms.
 90          */
 91         private void unlinkCancelledWaiters() { // 刪除處於結束狀態的節點
 92             Node t = firstWaiter;
 93             Node trail = null;
 94             // 第一個節點爲空,直接返回
 95             // 這裏會遍歷全部節點
 96             while (t != null) {
 97                 Node next = t.nextWaiter; // 記錄下一個節點的值
 98                 // 當節點狀態不爲CONDITION
 99                 if (t.waitStatus != Node.CONDITION) {
100                     // 首先將當前節點的下一個節點賦值爲空,切斷當前節點鏈路
101                     t.nextWaiter = null;
102                     // 若是追蹤節點爲空的時候,則存儲第一個節點的值爲next,由於當前節點狀態不爲CONDITION須要清理
103                     if (trail == null)
104                         firstWaiter = next;
105                     else  // 在追蹤節點串聯下一個節點,主要是爲了存儲最後一個節點的值
106                         trail.nextWaiter = next;
107                     if (next == null)  // 當next爲空時,則存儲trail爲最後一個節點,將最後一個節點值存儲下來
108                         lastWaiter = trail;
109                 }
110                 else  // 當節點狀態爲CONDITION時,將該節點賦值給trail
111                     trail = t;
112                 t = next; // 將next賦值給t,繼續遍歷
113             }
114         }
115 
116         // public methods
117 
118         /**
119          * 喚醒等待時間最長的節點,使其擁有鎖
120          * Moves the longest-waiting thread, if one exists, from the
121          * wait queue for this condition to the wait queue for the
122          * owning lock.
123          *
124          * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
125          *         returns {@code false}
126          */
127         public final void signal() {
128             // 若是線程不是獨佔資源,則拋出異常,從這裏也說明ConditionObject只能用在獨佔模式中
129             if (!isHeldExclusively())
130                 throw new IllegalMonitorStateException();
131             Node first = firstWaiter;
132             if (first != null)
133                 doSignal(first);
134         }
135 
136         /**
137          * 喚醒全部等待節點
138          * Moves all threads from the wait queue for this condition to
139          * the wait queue for the owning lock.
140          *
141          * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
142          *         returns {@code false}
143          */
144         public final void signalAll() {
145             if (!isHeldExclusively())
146                 throw new IllegalMonitorStateException();
147             Node first = firstWaiter;
148             if (first != null)
149                 doSignalAll(first);
150         }
151 
152         /**
153          * 節點不間斷等待
154          * Implements uninterruptible condition wait.
155          * <ol>
156          * <li> Save lock state returned by {@link #getState}.
157          * <li> Invoke {@link #release} with saved state as argument,
158          *      throwing IllegalMonitorStateException if it fails.
159          * <li> Block until signalled.
160          * <li> Reacquire by invoking specialized version of
161          *      {@link #acquire} with saved state as argument.
162          * </ol>
163          */
164         public final void awaitUninterruptibly() {
165             Node node = addConditionWaiter();
166             int savedState = fullyRelease(node);
167             boolean interrupted = false;
168             while (!isOnSyncQueue(node)) {
169                 LockSupport.park(this);
170                 if (Thread.interrupted())
171                     interrupted = true;
172             }
173             if (acquireQueued(node, savedState) || interrupted)
174                 selfInterrupt();
175         }
176         
177         /*
178          * For interruptible waits, we need to track whether to throw
179          * InterruptedException, if interrupted while blocked on
180          * condition, versus reinterrupt current thread, if
181          * interrupted while blocked waiting to re-acquire.
182          */
183 
184         /** Mode meaning to reinterrupt on exit from wait */
185         private static final int REINTERRUPT =  1;
186         /** Mode meaning to throw InterruptedException on exit from wait */
187         private static final int THROW_IE    = -1;
188 
189         /**
190          * Checks for interrupt, returning THROW_IE if interrupted
191          * before signalled, REINTERRUPT if after signalled, or
192          * 0 if not interrupted.
193          */
194         private int checkInterruptWhileWaiting(Node node) {
195             return Thread.interrupted() ?
196                 (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
197                 0;
198         }
199 
200         /**
201          * Throws InterruptedException, reinterrupts current thread, or
202          * does nothing, depending on mode.
203          */
204         private void reportInterruptAfterWait(int interruptMode)
205             throws InterruptedException {
206             if (interruptMode == THROW_IE)
207                 throw new InterruptedException();
208             else if (interruptMode == REINTERRUPT)
209                 selfInterrupt();
210         }
211 
212         /**
213          * Implements interruptible condition wait.
214          * <ol>
215          * <li> If current thread is interrupted, throw InterruptedException.
216          * <li> Save lock state returned by {@link #getState}.
217          * <li> Invoke {@link #release} with saved state as argument,
218          *      throwing IllegalMonitorStateException if it fails.
219          * <li> Block until signalled or interrupted.
220          * <li> Reacquire by invoking specialized version of
221          *      {@link #acquire} with saved state as argument.
222          * <li> If interrupted while blocked in step 4, throw InterruptedException.
223          * </ol>
224          */
225         public final void await() throws InterruptedException {
226             if (Thread.interrupted())
227                 throw new InterruptedException();
228             Node node = addConditionWaiter();
229             int savedState = fullyRelease(node);
230             int interruptMode = 0;
231             while (!isOnSyncQueue(node)) {
232                 LockSupport.park(this);
233                 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
234                     break;
235             }
236             if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
237                 interruptMode = REINTERRUPT;
238             if (node.nextWaiter != null) // clean up if cancelled
239                 unlinkCancelledWaiters();
240             if (interruptMode != 0)
241                 reportInterruptAfterWait(interruptMode);
242         }
243 
244         /**
245          * Implements timed condition wait.
246          * <ol>
247          * <li> If current thread is interrupted, throw InterruptedException.
248          * <li> Save lock state returned by {@link #getState}.
249          * <li> Invoke {@link #release} with saved state as argument,
250          *      throwing IllegalMonitorStateException if it fails.
251          * <li> Block until signalled, interrupted, or timed out.
252          * <li> Reacquire by invoking specialized version of
253          *      {@link #acquire} with saved state as argument.
254          * <li> If interrupted while blocked in step 4, throw InterruptedException.
255          * </ol>
256          */
257         public final long awaitNanos(long nanosTimeout)
258                 throws InterruptedException {
259             if (Thread.interrupted())
260                 throw new InterruptedException();
261             Node node = addConditionWaiter();
262             int savedState = fullyRelease(node);
263             final long deadline = System.nanoTime() + nanosTimeout;
264             int interruptMode = 0;
265             while (!isOnSyncQueue(node)) {
266                 if (nanosTimeout <= 0L) {
267                     transferAfterCancelledWait(node);
268                     break;
269                 }
270                 if (nanosTimeout >= spinForTimeoutThreshold)
271                     LockSupport.parkNanos(this, nanosTimeout);
272                 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
273                     break;
274                 nanosTimeout = deadline - System.nanoTime();
275             }
276             if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
277                 interruptMode = REINTERRUPT;
278             if (node.nextWaiter != null)
279                 unlinkCancelledWaiters();
280             if (interruptMode != 0)
281                 reportInterruptAfterWait(interruptMode);
282             return deadline - System.nanoTime();
283         }
284 
285         /**
286          * Implements absolute timed condition wait.
287          * <ol>
288          * <li> If current thread is interrupted, throw InterruptedException.
289          * <li> Save lock state returned by {@link #getState}.
290          * <li> Invoke {@link #release} with saved state as argument,
291          *      throwing IllegalMonitorStateException if it fails.
292          * <li> Block until signalled, interrupted, or timed out.
293          * <li> Reacquire by invoking specialized version of
294          *      {@link #acquire} with saved state as argument.
295          * <li> If interrupted while blocked in step 4, throw InterruptedException.
296          * <li> If timed out while blocked in step 4, return false, else true.
297          * </ol>
298          */
299         public final boolean awaitUntil(Date deadline)
300                 throws InterruptedException {
301             long abstime = deadline.getTime();
302             if (Thread.interrupted())
303                 throw new InterruptedException();
304             Node node = addConditionWaiter();
305             int savedState = fullyRelease(node);
306             boolean timedout = false;
307             int interruptMode = 0;
308             while (!isOnSyncQueue(node)) {
309                 if (System.currentTimeMillis() > abstime) {
310                     timedout = transferAfterCancelledWait(node);
311                     break;
312                 }
313                 LockSupport.parkUntil(this, abstime);
314                 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
315                     break;
316             }
317             if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
318                 interruptMode = REINTERRUPT;
319             if (node.nextWaiter != null)
320                 unlinkCancelledWaiters();
321             if (interruptMode != 0)
322                 reportInterruptAfterWait(interruptMode);
323             return !timedout;
324         }
325 
326         /**
327          * Implements timed condition wait.
328          * <ol>
329          * <li> If current thread is interrupted, throw InterruptedException.
330          * <li> Save lock state returned by {@link #getState}.
331          * <li> Invoke {@link #release} with saved state as argument,
332          *      throwing IllegalMonitorStateException if it fails.
333          * <li> Block until signalled, interrupted, or timed out.
334          * <li> Reacquire by invoking specialized version of
335          *      {@link #acquire} with saved state as argument.
336          * <li> If interrupted while blocked in step 4, throw InterruptedException.
337          * <li> If timed out while blocked in step 4, return false, else true.
338          * </ol>
339          */
340         public final boolean await(long time, TimeUnit unit)
341                 throws InterruptedException {
342             long nanosTimeout = unit.toNanos(time);
343             if (Thread.interrupted())
344                 throw new InterruptedException();
345             Node node = addConditionWaiter();
346             int savedState = fullyRelease(node);
347             final long deadline = System.nanoTime() + nanosTimeout;
348             boolean timedout = false;
349             int interruptMode = 0;
350             while (!isOnSyncQueue(node)) {
351                 if (nanosTimeout <= 0L) {
352                     timedout = transferAfterCancelledWait(node);
353                     break;
354                 }
355                 if (nanosTimeout >= spinForTimeoutThreshold)
356                     LockSupport.parkNanos(this, nanosTimeout);
357                 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
358                     break;
359                 nanosTimeout = deadline - System.nanoTime();
360             }
361             if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
362                 interruptMode = REINTERRUPT;
363             if (node.nextWaiter != null)
364                 unlinkCancelledWaiters();
365             if (interruptMode != 0)
366                 reportInterruptAfterWait(interruptMode);
367             return !timedout;
368         }
369 
370         //  support for instrumentation
371 
372         /**
373          * Returns true if this condition was created by the given
374          * synchronization object.
375          *
376          * @return {@code true} if owned
377          */
378         final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
379             return sync == AbstractQueuedSynchronizer.this;
380         }
381 
382         /**
383          * Queries whether any threads are waiting on this condition.
384          * Implements {@link AbstractQueuedSynchronizer#hasWaiters(ConditionObject)}.
385          *
386          * @return {@code true} if there are any waiting threads
387          * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
388          *         returns {@code false}
389          */
390         protected final boolean hasWaiters() {
391             if (!isHeldExclusively())
392                 throw new IllegalMonitorStateException();
393             for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
394                 if (w.waitStatus == Node.CONDITION)
395                     return true;
396             }
397             return false;
398         }
399 
400         /**
401          * Returns an estimate of the number of threads waiting on
402          * this condition.
403          * Implements {@link AbstractQueuedSynchronizer#getWaitQueueLength(ConditionObject)}.
404          *
405          * @return the estimated number of waiting threads
406          * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
407          *         returns {@code false}
408          */
409         protected final int getWaitQueueLength() {
410             if (!isHeldExclusively())
411                 throw new IllegalMonitorStateException();
412             int n = 0;
413             for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
414                 if (w.waitStatus == Node.CONDITION)
415                     ++n;
416             }
417             return n;
418         }
419 
420         /**
421          * Returns a collection containing those threads that may be
422          * waiting on this Condition.
423          * Implements {@link AbstractQueuedSynchronizer#getWaitingThreads(ConditionObject)}.
424          *
425          * @return the collection of threads
426          * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
427          *         returns {@code false}
428          */
429         protected final Collection<Thread> getWaitingThreads() {
430             if (!isHeldExclusively())
431                 throw new IllegalMonitorStateException();
432             ArrayList<Thread> list = new ArrayList<Thread>();
433             for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
434                 if (w.waitStatus == Node.CONDITION) {
435                     Thread t = w.thread;
436                     if (t != null)
437                         list.add(t);
438                 }
439             }
440             return list;
441         }
442     }
View Code

3.AQS成員函數

因爲AQS分獨佔模式和共享模式,所以這裏按獨佔、共享模式的順序對AQS的成員函數進行分析。

①acquire(int arg)

獨佔模式下獲取資源,若是獲取到資源,線程直接返回,不然進入等待隊列,直到獲取到資源爲止,整個過程忽略中斷。源碼以下:

 1  /**
 2      * Acquires in exclusive mode, ignoring interrupts.  Implemented
 3      * by invoking at least once {@link #tryAcquire},
 4      * returning on success.  Otherwise the thread is queued, possibly
 5      * repeatedly blocking and unblocking, invoking {@link
 6      * #tryAcquire} until success.  This method can be used
 7      * to implement method {@link Lock#lock}.
 8      *
 9      * @param arg the acquire argument.  This value is conveyed to
10      *        {@link #tryAcquire} but is otherwise uninterpreted and
11      *        can represent anything you like.
12      */
13     public final void acquire(int arg) {
14         if (!tryAcquire(arg) &&
15             acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
16             selfInterrupt();
17     }

該函數執行流程:

A.若是tryAcquire()成功獲取資源,則直接返回。

B.直接獲取資源失敗,則經過addWaiter()將線程加入隊列尾,並標記爲獨佔模式。

C.經過acquireQueued()讓線程在等待隊列中獲取資源,經過自旋方式,一直獲取到後才返回。若是在等待過程當中被中斷過,則返回true,不然返回false。

D.若是線程在等待獲取資源的過程當中被中斷,只有在獲取到資源後纔會去響應,執行selfInterrupt進行自我中斷。

#1.tryAcquire(int)

該方法是在獨佔模式下獲取資源,成功-ture,失敗-false。

1  protected boolean tryAcquire(int arg) {
2         throw new UnsupportedOperationException();
3     }

直接調用該方法會拋出異常,由於AQS只是一個框架,只是定義該接口,具體實現需在子類中實現。

#2.addWaiter(Node mode)

將當前線程加入等待隊列的隊尾,並返回當前線程所在的節點。

 1 private Node addWaiter(Node mode) {
 2        // 建立節點,以獨佔模式       
 3        Node node = new Node(Thread.currentThread(), mode);
 4         // Try the fast path of enq; backup to full enq on failure
 5         // 嘗試將節點快速放入隊尾
 6         Node pred = tail;
 7         if (pred != null) {
 8             node.prev = pred;
 9             // 主要經過CAS入隊尾
10             if (compareAndSetTail(pred, node)) {
11                 pred.next = node;
12                 return node;
13             }
14         }
15         // 若是快速入隊尾失敗,則經過enq方式入對尾
16         enq(node);
17         return node;
18     }

CAS操做後面討論,這裏先看enq(final Node node)入隊尾操做。

 1 private Node enq(final Node node) {
 2         // 這裏是CAS的「自旋」操做,直到將節點成功加入隊尾
 3         for (;;) {
 4             Node t = tail;
 5             // 由於每次入隊都是從隊尾加入,當隊尾爲null,則代表隊列爲null,則需初始化頭結點
 6             // 並將尾節點也指向頭節點
 7             if (t == null) { // Must initialize   
 8                 if (compareAndSetHead(new Node()))
 9                     tail = head;
10             } else {  // 經過CAS入隊尾,自旋操做
11                 node.prev = t;
12                 if (compareAndSetTail(t, node)) {
13                     t.next = node;
14                     return t;
15                 }
16             }
17         }
18     }

在線程入隊尾後,就須要acquireQueued函數了,該函數的做用是讓線程拿到資源,固然仍是經過自旋的方式來拿資源,也是就是一個排隊的過程。

 1 final boolean acquireQueued(final Node node, int arg) {
 2         boolean failed = true; // 標記是否成功拿到資源
 3         try {
 4             boolean interrupted = false; // 標記在等待過程當中是否被中斷過
 5             // 自旋操做
 6             for (;;) {
 7                 final Node p = node.predecessor(); // 拿到當前節點的前向節點
 8                 // 若是前向節點爲head,則代表當前節點排在第二位了,已經獲得獲取資源的資格
 9                 if (p == head && tryAcquire(arg)) {
10                     // 成功拿到資源後,將head節點指向當前節點
11                     // 從這裏能夠看出,head節點就是當前獲取到鎖的節點
12                     setHead(node); 
13                     // 將原來head節點的next設置爲null,方便GC回收之前的head節點,也就意味着以前拿到鎖的節點出隊列了
14                     p.next = null; // help GC
15                     failed = false;
16                     return interrupted;  // 返回在排隊過程當中線程是否被中斷過
17                 }
18                 // 到這裏,代表線程處於等待狀態,自旋直到被unpark
19                 if (shouldParkAfterFailedAcquire(p, node) &&
20                     parkAndCheckInterrupt())
21                     interrupted = true;
22             }
23         } finally {
24             if (failed) // 獲取資源失敗,則將節點標記爲結束狀態
25                 cancelAcquire(node);
26         }
27     }

在線程排隊等待的過程當中,有兩個關鍵函數shouldParkAfterFailedAcquire(Node pred, Node node)和parkAndCheckInterrupt()。

 1 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
 2         int ws = pred.waitStatus; // 前驅節點的狀態
 3         if (ws == Node.SIGNAL)
 4            // 若是前驅節點正處於被喚醒的狀態,則正常排隊等待便可
 5             /*
 6              * This node has already set status asking a release
 7              * to signal it, so it can safely park.
 8              */
 9             return true;
10         if (ws > 0) {  // 前驅節點處於結束狀態
11             /*
12              * Predecessor was cancelled. Skip over predecessors and
13              * indicate retry.
14              */
15             /*
16              *繼續向下找,一直找處處於正常等待狀態的節點,將當前節點插入其後,其餘
17              *無用節點造成一個鏈,會被GC
18              */
19             do {
20                 node.prev = pred = pred.prev;
21             } while (pred.waitStatus > 0);
22             pred.next = node;
23         } else {
24             /*
25              * waitStatus must be 0 or PROPAGATE.  Indicate that we
26              * need a signal, but don't park yet.  Caller will need to
27              * retry to make sure it cannot acquire before parking.
28              */
29             // 前驅節點狀態正常,則把前驅節點的狀態設置爲SIGNAL,這樣前驅節點拿到資源後,可通知下當前節點
30             compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
31         }
32         return false;
33     }

分析以上源碼可知:只有當前驅節點的狀態爲SIGNAL時,當前節點才能正常排隊等待,不然需找到一個合適的節點next位置來進行排隊等待。

1   private final boolean parkAndCheckInterrupt() {
2         // 使線程進入waitting狀態
3         LockSupport.park(this);
4         return Thread.interrupted(); // 返回線程是否被中斷過
5     }

該函數做用:當節點正常進入排隊後,讓線程進入等待狀態。

至此acquireQueued()函數總結完成,該函數的具體執行流程:

#1.首先檢查節點是否能夠當即獲取資源。

#2.若是不能當即獲取資源,則進行排隊,這裏須要找到正確的排隊點,直到unpark或interrupt喚醒本身。

#3.喚醒後,判斷本身是否有資格獲取資源,若是拿到資源,則將head指向當前節點,並返回在等待過程是否被中斷過,若是沒拿到資源,則繼續流程2。

acquire小結

到這裏acquire(int)函數分析結束,這個函數很是重要,這裏再貼上源碼:

1   public final void acquire(int arg) {
2         if (!tryAcquire(arg) &&
3             acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
4             selfInterrupt();
5     }

#1.調用子類的tryAcquire直接獲取資源,若是成功則返回。

#2.若是流程1失敗,則將線程加入等待隊列的隊尾(獨佔模式)。

#3.在acquireQueued中排隊,經過自旋獲取資源,直到獲取資源才返回。若是在排隊過程當中線程被中斷過返回true,不然返回false。

#4.在排隊過程當中被中斷是不響應的,只有獲取到資源後,才進行自我中斷,補上中斷標記。

整個過程的流程圖以下:

②release(int)獨佔模式釋放資源。

 1  public final boolean release(int arg) {
 2         // 嘗試釋放資源
 3         if (tryRelease(arg)) {
 4             Node h = head; 
 5             if (h != null && h.waitStatus != 0)
 6                 unparkSuccessor(h); // 喚醒隊列中下一個線程
 7             return true;
 8         }
 9         return false;
10     }

釋放鎖的函數很簡單,經過tryRelease嘗試釋放資源,而後喚醒隊列中的其餘線程。

tryRelease(int):

1    protected boolean tryRelease(int arg) {
2         throw new UnsupportedOperationException();
3     }

與tryAcquire函數同樣,該方法須要子類去實現,若是直接調用會拋異常。

unparkSuccessor(Node node):

喚醒等待隊列中的下一個線程,這裏喚醒的是等待隊列中最前邊那個未放棄的線程,注意看代碼註釋。

 1  private void unparkSuccessor(Node node) {
 2         /*
 3          * If status is negative (i.e., possibly needing signal) try
 4          * to clear in anticipation of signalling.  It is OK if this
 5          * fails or if status is changed by waiting thread.
 6          */
 7         int ws = node.waitStatus; // 獲取當前線程的狀態
 8         if (ws < 0) // 若是當前線程狀態處於可用狀態,則直接將狀態值置0
 9             compareAndSetWaitStatus(node, ws, 0);
10 
11         /*
12          * Thread to unpark is held in successor, which is normally
13          * just the next node.  But if cancelled or apparently null,
14          * traverse backwards from tail to find the actual
15          * non-cancelled successor.
16          */
17         Node s = node.next; // 下一個節點
18         if (s == null || s.waitStatus > 0) {  // 若是節點爲null或節點已處於結束狀態
19             s = null;
20             // 從隊列尾向前遍歷,找到next可用的節點,狀態小於0就可用,這裏的節點是隊列中最前邊的可用節點
21             for (Node t = tail; t != null && t != node; t = t.prev)
22                 if (t.waitStatus <= 0)  
23                     s = t;
24         }
25         if (s != null)
26             LockSupport.unpark(s.thread);// 喚醒next線程
27     }

獨佔模式的主要函數分析完畢,接下來看共享模式。

acquireShared(int)

共享模式下獲取資源,若是成功則直接返回,不然進入等待隊列,經過自旋直到獲取資源爲止。

1 public final void acquireShared(int arg) {
2         // 共享模式下獲取資源,若是獲取失敗,則進入等待隊列
3         // 一樣該函數須要子類去實現
4         if (tryAcquireShared(arg) < 0)
5             doAcquireShared(arg);  // 進入等待隊列直到鎖獲取到爲止
6     }

tryAcquireShared(int)函數返回值,須要注意下:

負數:表示獲取失敗;

0:獲取成功,但沒有剩餘資源;

正數:獲取成功,且有剩餘資源;

#1.doAcquireShared(int)

將線程加入隊列尾,而後經過自旋獲取資源,直到獲得資源才返回。

 1  private void doAcquireShared(int arg) {
 2         final Node node = addWaiter(Node.SHARED); // 將線程加入隊尾,經過共享模式
 3         boolean failed = true;// 是否成功
 4         try {
 5             boolean interrupted = false; // 在自旋過程當中是否被中斷過
 6             for (;;) {
 7                 final Node p = node.predecessor(); // 前驅節點
 8                 if (p == head) {  // 這裏代表當前節點處於head的next位,此時node被喚醒,極可能是head用完來喚醒
 9                     int r = tryAcquireShared(arg); // 獲取資源
10                     if (r >= 0) { // 成功
11                         setHeadAndPropagate(node, r);// 將head指向本身,還有剩餘資源可用的話再喚醒以後的線程
12                         p.next = null; // help GC 無用鏈,幫助GC
13                         if (interrupted)  // 若是等待過程當中被中斷過,將中斷補上
14                             selfInterrupt();
15                         failed = false;
16                         return;
17                     }
18                 }
19                 // 線程未排在head以後,繼續排隊,進入waiting狀態,等着unpark
20                 if (shouldParkAfterFailedAcquire(p, node) &&
21                     parkAndCheckInterrupt())
22                     interrupted = true;  // 中斷標記
23             }
24         } finally {
25             if (failed)
26                 cancelAcquire(node);
27         }
28     }

整個流程與獨佔模式的acquireQueued很類似,只是共享模式下,在喚醒本身後,若是還有剩餘資源,須要喚醒後續節點。

setHeadAndPropagate(node, int)

將head節點設置爲當前節點,若是還有剩餘資源,則喚醒下一個線程。

 1  private void setHeadAndPropagate(Node node, int propagate) {
 2         Node h = head; // Record old head for check below
 3         setHead(node); // 將隊列中的head執行當前節點
 4         /*
 5          * Try to signal next queued node if:
 6          *   Propagation was indicated by caller,
 7          *     or was recorded (as h.waitStatus either before
 8          *     or after setHead) by a previous operation
 9          *     (note: this uses sign-check of waitStatus because
10          *      PROPAGATE status may transition to SIGNAL.)
11          * and
12          *   The next node is waiting in shared mode,
13          *     or we don't know, because it appears null
14          *
15          * The conservatism in both of these checks may cause
16          * unnecessary wake-ups, but only when there are multiple
17          * racing acquires/releases, so most need signals now or soon
18          * anyway.
19          */
20          // 若是還有剩餘資源,則喚醒後續線程
21         if (propagate > 0 || h == null || h.waitStatus < 0 ||
22             (h = head) == null || h.waitStatus < 0) {
23             Node s = node.next;
24             if (s == null || s.isShared())
25                 doReleaseShared();
26         }
27     }

這裏除了將head設置成當前線程,若是有剩餘資源,須要喚醒後續節點。

doReleaseShared()

 1  private void doReleaseShared() {
 2         /*
 3          * Ensure that a release propagates, even if there are other
 4          * in-progress acquires/releases.  This proceeds in the usual
 5          * way of trying to unparkSuccessor of head if it needs
 6          * signal. But if it does not, status is set to PROPAGATE to
 7          * ensure that upon release, propagation continues.
 8          * Additionally, we must loop in case a new node is added
 9          * while we are doing this. Also, unlike other uses of
10          * unparkSuccessor, we need to know if CAS to reset status
11          * fails, if so rechecking.
12          */
13         // 自旋操做
14         for (;;) {
15             Node h = head;
16             if (h != null && h != tail) {
17                 int ws = h.waitStatus;
18                 if (ws == Node.SIGNAL) { // 若是head狀態爲SIGNAL,則需喚醒後續節點
19                     // CAS一下當前節點的狀態,判斷是否爲SIGNAL,若是是則置爲0,不然繼續循環
20                     if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
21                         continue;            // loop to recheck cases
22                     unparkSuccessor(h); // 喚醒後繼節點
23                 }
24                 // 若是head節點狀態爲0,且CAS置爲傳播狀態失敗,則繼續循環,由於if操做中會改變節點的狀態
25                 else if (ws == 0 &&
26                          !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
27                     continue;                // loop on failed CAS
28             }
29             if (h == head) // 若是head節點發生了改變,則繼續自旋操做,防止上述操做過程當中添加了節點的狀況                  // loop if head changed
30                 break;
31         }
32     }

該方法的做用主要是用於喚醒後續節點。

共享模式獲取鎖操做與獨佔模式基本相同:先直接獲取資源,若是成功,直接返回;若是失敗,則將線程加入等待隊列尾,直到獲取到資源才返回,整個過程忽略中斷。不一樣點在於共享模式下本身拿到資源後,還須要喚醒後續節點。

#2.releaseShared(int)

同享模式下釋放資源

1 public final boolean releaseShared(int arg) {
2         if (tryReleaseShared(arg)) { // 嘗試釋放資源
3             doReleaseShared(); // 喚醒後續節點,前面已經分析
4             return true;
5         }
6         return false;
7     }

共享模式釋放資源與獨佔模式相似,可是獨佔模式下須要徹底釋放資源後,纔會返回true,而共享模式沒有這種要求。

總結

這裏只是對AQS的頂層框架進行了簡要的分析,具體須要深刻其子類中去,AQS的子類按模式分類可聚合成如下幾類:

#1.獨佔模式:

ReentrantLock:可重入鎖。state=0獨佔鎖,或者同一線程可屢次獲取鎖(獲取+1,釋放-1)。
Worker(java.util.concurrent.ThreadPoolExecutor類中的內部類)線程池類。shutdown關閉空閒工做線程,中斷worker工做線程是獨佔的,互斥的。

#2.共享模式:
Semaphore:信號量。 控制同時有多少個線程能夠進入代碼段。(互斥鎖的拓展)
CountDownLatch:倒計時器。  初始化一個值,多線程減小這個值,直到爲0,倒計時完畢,執行後續代碼。

#3.獨佔+共享模式:
ReentrantReadWriteLock:可重入讀寫鎖。獨佔寫+共享讀,即併發讀,互斥寫。

後續對這些類進行詳細分析。


by Shawn Chen,2019.1.29日,下午。

相關文章
相關標籤/搜索