前言:AQS框架在J.U.C中的地位不言而喻,能夠說沒有AQS就沒有J.U.C包,可見其重要性,所以有必要對其原理進行詳細深刻的理解。java
在深刻AQS以前,首先咱們要搞清楚什麼是AQS。AQS全稱是AbstractQueuedSynchronizer,咱們直接查看AQS源碼的註釋。node
大體意思就是說:AQS提供了實現阻塞鎖和相關同步器並依賴先進先出(FIFO)等待隊列的框架。數據結構
AQS依賴一個原子數值做爲鎖的狀態,子類能夠有多個狀態值,只能經過原子方法區操做該值,從而保證同步。多線程
經過第一段的註釋大體總結下AQS是什麼:併發
①AQS是一個同步的基礎框架,基於一個先進先出的隊列。app
②鎖機制依賴一個原子值的狀態。框架
③AQS的子類負責定義與操做這個狀態值,但必須經過AQS提供的原子操做。ide
④AQS剩餘的方法就是圍繞隊列,與線程阻塞喚醒等功能。函數
AQS中有兩個重要的成員變量:Node和ConditionObject。oop
①Node的做用是存儲獲取鎖失敗的線程,而且維護一個CLH FIFO隊列,該隊列是會被多線程操做的,因此Node中大部分變量都是被volatile修飾,而且經過自旋和CAS進行原子性的操做。CLH的數據結構以下:
Node有一個模式的屬性:獨佔模式和共享模式,獨佔模式下資源是線程獨佔的,共享模式下,資源是能夠被多個線程佔用的。
Node源碼以下:
1 static final class Node { 2 /** Marker to indicate a node is waiting in shared mode */ 3 static final Node SHARED = new Node(); // 共享模式 4 /** Marker to indicate a node is waiting in exclusive mode */ 5 static final Node EXCLUSIVE = null; // 獨佔模式 6 7 /** waitStatus value to indicate thread has cancelled */ 8 static final int CANCELLED = 1; // 代表線程已處於結束狀態(被取消) 9 /** waitStatus value to indicate successor's thread needs unparking */ 10 static final int SIGNAL = -1; // 代表線程須要被喚醒 11 /** waitStatus value to indicate thread is waiting on condition */ 12 static final int CONDITION = -2; // 代表線程正處於條件隊列上,等待某一條件 13 /** 14 * waitStatus value to indicate the next acquireShared should 15 * unconditionally propagate 16 */ 17 static final int PROPAGATE = -3; // 共享模式下同步狀態會被傳播 18 19 /** 20 * Status field, taking on only the values: 21 * SIGNAL: The successor of this node is (or will soon be) 22 * blocked (via park), so the current node must 23 * unpark its successor when it releases or 24 * cancels. To avoid races, acquire methods must 25 * first indicate they need a signal, 26 * then retry the atomic acquire, and then, 27 * on failure, block. 28 * CANCELLED: This node is cancelled due to timeout or interrupt. 29 * Nodes never leave this state. In particular, 30 * a thread with cancelled node never again blocks. 31 * CONDITION: This node is currently on a condition queue. 32 * It will not be used as a sync queue node 33 * until transferred, at which time the status 34 * will be set to 0. (Use of this value here has 35 * nothing to do with the other uses of the 36 * field, but simplifies mechanics.) 37 * PROPAGATE: A releaseShared should be propagated to other 38 * nodes. This is set (for head node only) in 39 * doReleaseShared to ensure propagation 40 * continues, even if other operations have 41 * since intervened. 42 * 0: None of the above 43 * 44 * The values are arranged numerically to simplify use. 45 * Non-negative values mean that a node doesn't need to 46 * signal. So, most code doesn't need to check for particular 47 * values, just for sign. 48 * 49 * The field is initialized to 0 for normal sync nodes, and 50 * CONDITION for condition nodes. It is modified using CAS 51 * (or when possible, unconditional volatile writes). 52 */ 53 volatile int waitStatus; 54 55 /** 56 * Link to predecessor node that current node/thread relies on 57 * for checking waitStatus. Assigned during enqueuing, and nulled 58 * out (for sake of GC) only upon dequeuing. Also, upon 59 * cancellation of a predecessor, we short-circuit while 60 * finding a non-cancelled one, which will always exist 61 * because the head node is never cancelled: A node becomes 62 * head only as a result of successful acquire. A 63 * cancelled thread never succeeds in acquiring, and a thread only 64 * cancels itself, not any other node. 65 */ 66 volatile Node prev; 67 68 /** 69 * Link to the successor node that the current node/thread 70 * unparks upon release. Assigned during enqueuing, adjusted 71 * when bypassing cancelled predecessors, and nulled out (for 72 * sake of GC) when dequeued. The enq operation does not 73 * assign next field of a predecessor until after attachment, 74 * so seeing a null next field does not necessarily mean that 75 * node is at end of queue. However, if a next field appears 76 * to be null, we can scan prev's from the tail to 77 * double-check. The next field of cancelled nodes is set to 78 * point to the node itself instead of null, to make life 79 * easier for isOnSyncQueue. 80 */ 81 volatile Node next; 82 83 /** 84 * The thread that enqueued this node. Initialized on 85 * construction and nulled out after use. 86 */ 87 volatile Thread thread; 88 89 /** 90 * Link to next node waiting on condition, or the special 91 * value SHARED. Because condition queues are accessed only 92 * when holding in exclusive mode, we just need a simple 93 * linked queue to hold nodes while they are waiting on 94 * conditions. They are then transferred to the queue to 95 * re-acquire. And because conditions can only be exclusive, 96 * we save a field by using special value to indicate shared 97 * mode. 98 */ 99 Node nextWaiter; 100 101 /** 102 * Returns true if node is waiting in shared mode. 103 */ 104 final boolean isShared() { 105 return nextWaiter == SHARED; 106 } 107 108 /** 109 * Returns previous node, or throws NullPointerException if null. 110 * Use when predecessor cannot be null. The null check could 111 * be elided, but is present to help the VM. 112 * 113 * @return the predecessor of this node 114 */ 115 final Node predecessor() throws NullPointerException { 116 Node p = prev; 117 if (p == null) 118 throw new NullPointerException(); 119 else 120 return p; 121 } 122 123 Node() { // Used to establish initial head or SHARED marker 124 } 125 // 線程加入等待結點 126 Node(Thread thread, Node mode) { // Used by addWaiter 127 this.nextWaiter = mode; 128 this.thread = thread; 129 } 130 // 線程加入條件對列,會帶上線程的狀態值waitStatus 131 Node(Thread thread, int waitStatus) { // Used by Condition 132 this.waitStatus = waitStatus; 133 this.thread = thread; 134 } 135 }
②ConditionObject:條件隊列,這個類的做用從AQS的註釋上可知。
該類主要是爲了讓子類實現獨佔模式。AQS框架下獨佔模式的獲取資源、釋放等操做到最後都是基於這個類實現的。只有在獨佔模式下才會去使用該類。
ConditionObject源碼以下(對主要代碼進行了註釋):
1 public class ConditionObject implements Condition, java.io.Serializable { 2 private static final long serialVersionUID = 1173984872572414699L; 3 /** First node of condition queue. */ 4 private transient Node firstWaiter; // 存儲條件對列中第一個節點 5 /** Last node of condition queue. */ 6 private transient Node lastWaiter; // 存儲條件對列中最後一個節點 7 8 /** 9 * Creates a new {@code ConditionObject} instance. 10 */ 11 public ConditionObject() { } 12 13 // Internal methods 14 15 /** 16 * Adds a new waiter to wait queue. // 增長一個新的節點到等待隊列中 17 * @return its new wait node 18 */ 19 private Node addConditionWaiter() { 20 Node t = lastWaiter; 21 // 若是最後一個節點的狀態已經結束,則直接清理掉 22 // If lastWaiter is cancelled, clean out. 23 if (t != null && t.waitStatus != Node.CONDITION) { 24 // 拆分已經處於結束狀態的節點 也就是清除掉這類節點 25 unlinkCancelledWaiters(); 26 t = lastWaiter; 27 } 28 // 建立一個新的節點,帶上結點狀態,代表結點處於條件對列上 29 Node node = new Node(Thread.currentThread(), Node.CONDITION); 30 /** 31 條件隊列中加入節點都是從隊尾加入,而且從下面代碼可知,每次都會存儲最後一個節點的值。 32 當最後一個節點爲空時,說明隊列中不存在節點,因此將node賦值給第一個節點,不然將節點加入對列尾 33 */ 34 if (t == null) 35 firstWaiter = node; 36 else 37 t.nextWaiter = node; 38 lastWaiter = node; // 存儲最後一個節點的值 39 return node; 40 } 41 42 /** 43 * 喚醒節點 44 * 移除和轉換節點直到節點狀態處於未結束或者爲空 (節點移除至關於喚醒) 45 * Removes and transfers nodes until hit non-cancelled one or 46 * null. Split out from signal in part to encourage compilers 47 * to inline the case of no waiters. 48 * @param first (non-null) the first node on condition queue 49 */ 50 private void doSignal(Node first) { 51 do { 52 // 當next節點爲null時,則將lastWaiter賦值爲null 53 if ( (firstWaiter = first.nextWaiter) == null) 54 lastWaiter = null; 55 first.nextWaiter = null; // 切斷當前節點 56 } while (!transferForSignal(first) && 57 (first = firstWaiter) != null); 58 } 59 60 /** 61 * 喚醒全部節點 62 * Removes and transfers all nodes. 63 * @param first (non-null) the first node on condition queue 64 */ 65 private void doSignalAll(Node first) { 66 lastWaiter = firstWaiter = null; 67 do { 68 // 循環喚醒全部節點,代碼仍是比較容易理解 69 // 將每一個節點直接截斷便可 70 Node next = first.nextWaiter; 71 first.nextWaiter = null; 72 transferForSignal(first); 73 first = next; 74 } while (first != null); 75 } 76 77 /** 78 * Unlinks cancelled waiter nodes from condition queue. 79 * Called only while holding lock. This is called when 80 * cancellation occurred during condition wait, and upon 81 * insertion of a new waiter when lastWaiter is seen to have 82 * been cancelled. This method is needed to avoid garbage 83 * retention in the absence of signals. So even though it may 84 * require a full traversal, it comes into play only when 85 * timeouts or cancellations occur in the absence of 86 * signals. It traverses all nodes rather than stopping at a 87 * particular target to unlink all pointers to garbage nodes 88 * without requiring many re-traversals during cancellation 89 * storms. 90 */ 91 private void unlinkCancelledWaiters() { // 刪除處於結束狀態的節點 92 Node t = firstWaiter; 93 Node trail = null; 94 // 第一個節點爲空,直接返回 95 // 這裏會遍歷全部節點 96 while (t != null) { 97 Node next = t.nextWaiter; // 記錄下一個節點的值 98 // 當節點狀態不爲CONDITION 99 if (t.waitStatus != Node.CONDITION) { 100 // 首先將當前節點的下一個節點賦值爲空,切斷當前節點鏈路 101 t.nextWaiter = null; 102 // 若是追蹤節點爲空的時候,則存儲第一個節點的值爲next,由於當前節點狀態不爲CONDITION須要清理 103 if (trail == null) 104 firstWaiter = next; 105 else // 在追蹤節點串聯下一個節點,主要是爲了存儲最後一個節點的值 106 trail.nextWaiter = next; 107 if (next == null) // 當next爲空時,則存儲trail爲最後一個節點,將最後一個節點值存儲下來 108 lastWaiter = trail; 109 } 110 else // 當節點狀態爲CONDITION時,將該節點賦值給trail 111 trail = t; 112 t = next; // 將next賦值給t,繼續遍歷 113 } 114 } 115 116 // public methods 117 118 /** 119 * 喚醒等待時間最長的節點,使其擁有鎖 120 * Moves the longest-waiting thread, if one exists, from the 121 * wait queue for this condition to the wait queue for the 122 * owning lock. 123 * 124 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 125 * returns {@code false} 126 */ 127 public final void signal() { 128 // 若是線程不是獨佔資源,則拋出異常,從這裏也說明ConditionObject只能用在獨佔模式中 129 if (!isHeldExclusively()) 130 throw new IllegalMonitorStateException(); 131 Node first = firstWaiter; 132 if (first != null) 133 doSignal(first); 134 } 135 136 /** 137 * 喚醒全部等待節點 138 * Moves all threads from the wait queue for this condition to 139 * the wait queue for the owning lock. 140 * 141 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 142 * returns {@code false} 143 */ 144 public final void signalAll() { 145 if (!isHeldExclusively()) 146 throw new IllegalMonitorStateException(); 147 Node first = firstWaiter; 148 if (first != null) 149 doSignalAll(first); 150 } 151 152 /** 153 * 節點不間斷等待 154 * Implements uninterruptible condition wait. 155 * <ol> 156 * <li> Save lock state returned by {@link #getState}. 157 * <li> Invoke {@link #release} with saved state as argument, 158 * throwing IllegalMonitorStateException if it fails. 159 * <li> Block until signalled. 160 * <li> Reacquire by invoking specialized version of 161 * {@link #acquire} with saved state as argument. 162 * </ol> 163 */ 164 public final void awaitUninterruptibly() { 165 Node node = addConditionWaiter(); 166 int savedState = fullyRelease(node); 167 boolean interrupted = false; 168 while (!isOnSyncQueue(node)) { 169 LockSupport.park(this); 170 if (Thread.interrupted()) 171 interrupted = true; 172 } 173 if (acquireQueued(node, savedState) || interrupted) 174 selfInterrupt(); 175 } 176 177 /* 178 * For interruptible waits, we need to track whether to throw 179 * InterruptedException, if interrupted while blocked on 180 * condition, versus reinterrupt current thread, if 181 * interrupted while blocked waiting to re-acquire. 182 */ 183 184 /** Mode meaning to reinterrupt on exit from wait */ 185 private static final int REINTERRUPT = 1; 186 /** Mode meaning to throw InterruptedException on exit from wait */ 187 private static final int THROW_IE = -1; 188 189 /** 190 * Checks for interrupt, returning THROW_IE if interrupted 191 * before signalled, REINTERRUPT if after signalled, or 192 * 0 if not interrupted. 193 */ 194 private int checkInterruptWhileWaiting(Node node) { 195 return Thread.interrupted() ? 196 (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 197 0; 198 } 199 200 /** 201 * Throws InterruptedException, reinterrupts current thread, or 202 * does nothing, depending on mode. 203 */ 204 private void reportInterruptAfterWait(int interruptMode) 205 throws InterruptedException { 206 if (interruptMode == THROW_IE) 207 throw new InterruptedException(); 208 else if (interruptMode == REINTERRUPT) 209 selfInterrupt(); 210 } 211 212 /** 213 * Implements interruptible condition wait. 214 * <ol> 215 * <li> If current thread is interrupted, throw InterruptedException. 216 * <li> Save lock state returned by {@link #getState}. 217 * <li> Invoke {@link #release} with saved state as argument, 218 * throwing IllegalMonitorStateException if it fails. 219 * <li> Block until signalled or interrupted. 220 * <li> Reacquire by invoking specialized version of 221 * {@link #acquire} with saved state as argument. 222 * <li> If interrupted while blocked in step 4, throw InterruptedException. 223 * </ol> 224 */ 225 public final void await() throws InterruptedException { 226 if (Thread.interrupted()) 227 throw new InterruptedException(); 228 Node node = addConditionWaiter(); 229 int savedState = fullyRelease(node); 230 int interruptMode = 0; 231 while (!isOnSyncQueue(node)) { 232 LockSupport.park(this); 233 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 234 break; 235 } 236 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 237 interruptMode = REINTERRUPT; 238 if (node.nextWaiter != null) // clean up if cancelled 239 unlinkCancelledWaiters(); 240 if (interruptMode != 0) 241 reportInterruptAfterWait(interruptMode); 242 } 243 244 /** 245 * Implements timed condition wait. 246 * <ol> 247 * <li> If current thread is interrupted, throw InterruptedException. 248 * <li> Save lock state returned by {@link #getState}. 249 * <li> Invoke {@link #release} with saved state as argument, 250 * throwing IllegalMonitorStateException if it fails. 251 * <li> Block until signalled, interrupted, or timed out. 252 * <li> Reacquire by invoking specialized version of 253 * {@link #acquire} with saved state as argument. 254 * <li> If interrupted while blocked in step 4, throw InterruptedException. 255 * </ol> 256 */ 257 public final long awaitNanos(long nanosTimeout) 258 throws InterruptedException { 259 if (Thread.interrupted()) 260 throw new InterruptedException(); 261 Node node = addConditionWaiter(); 262 int savedState = fullyRelease(node); 263 final long deadline = System.nanoTime() + nanosTimeout; 264 int interruptMode = 0; 265 while (!isOnSyncQueue(node)) { 266 if (nanosTimeout <= 0L) { 267 transferAfterCancelledWait(node); 268 break; 269 } 270 if (nanosTimeout >= spinForTimeoutThreshold) 271 LockSupport.parkNanos(this, nanosTimeout); 272 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 273 break; 274 nanosTimeout = deadline - System.nanoTime(); 275 } 276 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 277 interruptMode = REINTERRUPT; 278 if (node.nextWaiter != null) 279 unlinkCancelledWaiters(); 280 if (interruptMode != 0) 281 reportInterruptAfterWait(interruptMode); 282 return deadline - System.nanoTime(); 283 } 284 285 /** 286 * Implements absolute timed condition wait. 287 * <ol> 288 * <li> If current thread is interrupted, throw InterruptedException. 289 * <li> Save lock state returned by {@link #getState}. 290 * <li> Invoke {@link #release} with saved state as argument, 291 * throwing IllegalMonitorStateException if it fails. 292 * <li> Block until signalled, interrupted, or timed out. 293 * <li> Reacquire by invoking specialized version of 294 * {@link #acquire} with saved state as argument. 295 * <li> If interrupted while blocked in step 4, throw InterruptedException. 296 * <li> If timed out while blocked in step 4, return false, else true. 297 * </ol> 298 */ 299 public final boolean awaitUntil(Date deadline) 300 throws InterruptedException { 301 long abstime = deadline.getTime(); 302 if (Thread.interrupted()) 303 throw new InterruptedException(); 304 Node node = addConditionWaiter(); 305 int savedState = fullyRelease(node); 306 boolean timedout = false; 307 int interruptMode = 0; 308 while (!isOnSyncQueue(node)) { 309 if (System.currentTimeMillis() > abstime) { 310 timedout = transferAfterCancelledWait(node); 311 break; 312 } 313 LockSupport.parkUntil(this, abstime); 314 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 315 break; 316 } 317 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 318 interruptMode = REINTERRUPT; 319 if (node.nextWaiter != null) 320 unlinkCancelledWaiters(); 321 if (interruptMode != 0) 322 reportInterruptAfterWait(interruptMode); 323 return !timedout; 324 } 325 326 /** 327 * Implements timed condition wait. 328 * <ol> 329 * <li> If current thread is interrupted, throw InterruptedException. 330 * <li> Save lock state returned by {@link #getState}. 331 * <li> Invoke {@link #release} with saved state as argument, 332 * throwing IllegalMonitorStateException if it fails. 333 * <li> Block until signalled, interrupted, or timed out. 334 * <li> Reacquire by invoking specialized version of 335 * {@link #acquire} with saved state as argument. 336 * <li> If interrupted while blocked in step 4, throw InterruptedException. 337 * <li> If timed out while blocked in step 4, return false, else true. 338 * </ol> 339 */ 340 public final boolean await(long time, TimeUnit unit) 341 throws InterruptedException { 342 long nanosTimeout = unit.toNanos(time); 343 if (Thread.interrupted()) 344 throw new InterruptedException(); 345 Node node = addConditionWaiter(); 346 int savedState = fullyRelease(node); 347 final long deadline = System.nanoTime() + nanosTimeout; 348 boolean timedout = false; 349 int interruptMode = 0; 350 while (!isOnSyncQueue(node)) { 351 if (nanosTimeout <= 0L) { 352 timedout = transferAfterCancelledWait(node); 353 break; 354 } 355 if (nanosTimeout >= spinForTimeoutThreshold) 356 LockSupport.parkNanos(this, nanosTimeout); 357 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 358 break; 359 nanosTimeout = deadline - System.nanoTime(); 360 } 361 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 362 interruptMode = REINTERRUPT; 363 if (node.nextWaiter != null) 364 unlinkCancelledWaiters(); 365 if (interruptMode != 0) 366 reportInterruptAfterWait(interruptMode); 367 return !timedout; 368 } 369 370 // support for instrumentation 371 372 /** 373 * Returns true if this condition was created by the given 374 * synchronization object. 375 * 376 * @return {@code true} if owned 377 */ 378 final boolean isOwnedBy(AbstractQueuedSynchronizer sync) { 379 return sync == AbstractQueuedSynchronizer.this; 380 } 381 382 /** 383 * Queries whether any threads are waiting on this condition. 384 * Implements {@link AbstractQueuedSynchronizer#hasWaiters(ConditionObject)}. 385 * 386 * @return {@code true} if there are any waiting threads 387 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 388 * returns {@code false} 389 */ 390 protected final boolean hasWaiters() { 391 if (!isHeldExclusively()) 392 throw new IllegalMonitorStateException(); 393 for (Node w = firstWaiter; w != null; w = w.nextWaiter) { 394 if (w.waitStatus == Node.CONDITION) 395 return true; 396 } 397 return false; 398 } 399 400 /** 401 * Returns an estimate of the number of threads waiting on 402 * this condition. 403 * Implements {@link AbstractQueuedSynchronizer#getWaitQueueLength(ConditionObject)}. 404 * 405 * @return the estimated number of waiting threads 406 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 407 * returns {@code false} 408 */ 409 protected final int getWaitQueueLength() { 410 if (!isHeldExclusively()) 411 throw new IllegalMonitorStateException(); 412 int n = 0; 413 for (Node w = firstWaiter; w != null; w = w.nextWaiter) { 414 if (w.waitStatus == Node.CONDITION) 415 ++n; 416 } 417 return n; 418 } 419 420 /** 421 * Returns a collection containing those threads that may be 422 * waiting on this Condition. 423 * Implements {@link AbstractQueuedSynchronizer#getWaitingThreads(ConditionObject)}. 424 * 425 * @return the collection of threads 426 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 427 * returns {@code false} 428 */ 429 protected final Collection<Thread> getWaitingThreads() { 430 if (!isHeldExclusively()) 431 throw new IllegalMonitorStateException(); 432 ArrayList<Thread> list = new ArrayList<Thread>(); 433 for (Node w = firstWaiter; w != null; w = w.nextWaiter) { 434 if (w.waitStatus == Node.CONDITION) { 435 Thread t = w.thread; 436 if (t != null) 437 list.add(t); 438 } 439 } 440 return list; 441 } 442 }
因爲AQS分獨佔模式和共享模式,所以這裏按獨佔、共享模式的順序對AQS的成員函數進行分析。
①acquire(int arg)
獨佔模式下獲取資源,若是獲取到資源,線程直接返回,不然進入等待隊列,直到獲取到資源爲止,整個過程忽略中斷。源碼以下:
1 /** 2 * Acquires in exclusive mode, ignoring interrupts. Implemented 3 * by invoking at least once {@link #tryAcquire}, 4 * returning on success. Otherwise the thread is queued, possibly 5 * repeatedly blocking and unblocking, invoking {@link 6 * #tryAcquire} until success. This method can be used 7 * to implement method {@link Lock#lock}. 8 * 9 * @param arg the acquire argument. This value is conveyed to 10 * {@link #tryAcquire} but is otherwise uninterpreted and 11 * can represent anything you like. 12 */ 13 public final void acquire(int arg) { 14 if (!tryAcquire(arg) && 15 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 16 selfInterrupt(); 17 }
該函數執行流程:
A.若是tryAcquire()成功獲取資源,則直接返回。
B.直接獲取資源失敗,則經過addWaiter()將線程加入隊列尾,並標記爲獨佔模式。
C.經過acquireQueued()讓線程在等待隊列中獲取資源,經過自旋方式,一直獲取到後才返回。若是在等待過程當中被中斷過,則返回true,不然返回false。
D.若是線程在等待獲取資源的過程當中被中斷,只有在獲取到資源後纔會去響應,執行selfInterrupt進行自我中斷。
#1.tryAcquire(int)
該方法是在獨佔模式下獲取資源,成功-ture,失敗-false。
1 protected boolean tryAcquire(int arg) { 2 throw new UnsupportedOperationException(); 3 }
直接調用該方法會拋出異常,由於AQS只是一個框架,只是定義該接口,具體實現需在子類中實現。
#2.addWaiter(Node mode)
將當前線程加入等待隊列的隊尾,並返回當前線程所在的節點。
1 private Node addWaiter(Node mode) { 2 // 建立節點,以獨佔模式 3 Node node = new Node(Thread.currentThread(), mode); 4 // Try the fast path of enq; backup to full enq on failure 5 // 嘗試將節點快速放入隊尾 6 Node pred = tail; 7 if (pred != null) { 8 node.prev = pred; 9 // 主要經過CAS入隊尾 10 if (compareAndSetTail(pred, node)) { 11 pred.next = node; 12 return node; 13 } 14 } 15 // 若是快速入隊尾失敗,則經過enq方式入對尾 16 enq(node); 17 return node; 18 }
CAS操做後面討論,這裏先看enq(final Node node)入隊尾操做。
1 private Node enq(final Node node) { 2 // 這裏是CAS的「自旋」操做,直到將節點成功加入隊尾 3 for (;;) { 4 Node t = tail; 5 // 由於每次入隊都是從隊尾加入,當隊尾爲null,則代表隊列爲null,則需初始化頭結點 6 // 並將尾節點也指向頭節點 7 if (t == null) { // Must initialize 8 if (compareAndSetHead(new Node())) 9 tail = head; 10 } else { // 經過CAS入隊尾,自旋操做 11 node.prev = t; 12 if (compareAndSetTail(t, node)) { 13 t.next = node; 14 return t; 15 } 16 } 17 } 18 }
在線程入隊尾後,就須要acquireQueued函數了,該函數的做用是讓線程拿到資源,固然仍是經過自旋的方式來拿資源,也是就是一個排隊的過程。
1 final boolean acquireQueued(final Node node, int arg) { 2 boolean failed = true; // 標記是否成功拿到資源 3 try { 4 boolean interrupted = false; // 標記在等待過程當中是否被中斷過 5 // 自旋操做 6 for (;;) { 7 final Node p = node.predecessor(); // 拿到當前節點的前向節點 8 // 若是前向節點爲head,則代表當前節點排在第二位了,已經獲得獲取資源的資格 9 if (p == head && tryAcquire(arg)) { 10 // 成功拿到資源後,將head節點指向當前節點 11 // 從這裏能夠看出,head節點就是當前獲取到鎖的節點 12 setHead(node); 13 // 將原來head節點的next設置爲null,方便GC回收之前的head節點,也就意味着以前拿到鎖的節點出隊列了 14 p.next = null; // help GC 15 failed = false; 16 return interrupted; // 返回在排隊過程當中線程是否被中斷過 17 } 18 // 到這裏,代表線程處於等待狀態,自旋直到被unpark 19 if (shouldParkAfterFailedAcquire(p, node) && 20 parkAndCheckInterrupt()) 21 interrupted = true; 22 } 23 } finally { 24 if (failed) // 獲取資源失敗,則將節點標記爲結束狀態 25 cancelAcquire(node); 26 } 27 }
在線程排隊等待的過程當中,有兩個關鍵函數shouldParkAfterFailedAcquire(Node pred, Node node)和parkAndCheckInterrupt()。
1 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { 2 int ws = pred.waitStatus; // 前驅節點的狀態 3 if (ws == Node.SIGNAL) 4 // 若是前驅節點正處於被喚醒的狀態,則正常排隊等待便可 5 /* 6 * This node has already set status asking a release 7 * to signal it, so it can safely park. 8 */ 9 return true; 10 if (ws > 0) { // 前驅節點處於結束狀態 11 /* 12 * Predecessor was cancelled. Skip over predecessors and 13 * indicate retry. 14 */ 15 /* 16 *繼續向下找,一直找處處於正常等待狀態的節點,將當前節點插入其後,其餘 17 *無用節點造成一個鏈,會被GC 18 */ 19 do { 20 node.prev = pred = pred.prev; 21 } while (pred.waitStatus > 0); 22 pred.next = node; 23 } else { 24 /* 25 * waitStatus must be 0 or PROPAGATE. Indicate that we 26 * need a signal, but don't park yet. Caller will need to 27 * retry to make sure it cannot acquire before parking. 28 */ 29 // 前驅節點狀態正常,則把前驅節點的狀態設置爲SIGNAL,這樣前驅節點拿到資源後,可通知下當前節點 30 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); 31 } 32 return false; 33 }
分析以上源碼可知:只有當前驅節點的狀態爲SIGNAL時,當前節點才能正常排隊等待,不然需找到一個合適的節點next位置來進行排隊等待。
1 private final boolean parkAndCheckInterrupt() { 2 // 使線程進入waitting狀態 3 LockSupport.park(this); 4 return Thread.interrupted(); // 返回線程是否被中斷過 5 }
該函數做用:當節點正常進入排隊後,讓線程進入等待狀態。
至此acquireQueued()函數總結完成,該函數的具體執行流程:
#1.首先檢查節點是否能夠當即獲取資源。
#2.若是不能當即獲取資源,則進行排隊,這裏須要找到正確的排隊點,直到unpark或interrupt喚醒本身。
#3.喚醒後,判斷本身是否有資格獲取資源,若是拿到資源,則將head指向當前節點,並返回在等待過程是否被中斷過,若是沒拿到資源,則繼續流程2。
到這裏acquire(int)函數分析結束,這個函數很是重要,這裏再貼上源碼:
1 public final void acquire(int arg) { 2 if (!tryAcquire(arg) && 3 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 4 selfInterrupt(); 5 }
#1.調用子類的tryAcquire直接獲取資源,若是成功則返回。
#2.若是流程1失敗,則將線程加入等待隊列的隊尾(獨佔模式)。
#3.在acquireQueued中排隊,經過自旋獲取資源,直到獲取資源才返回。若是在排隊過程當中線程被中斷過返回true,不然返回false。
#4.在排隊過程當中被中斷是不響應的,只有獲取到資源後,才進行自我中斷,補上中斷標記。
整個過程的流程圖以下:
②release(int)獨佔模式釋放資源。
1 public final boolean release(int arg) { 2 // 嘗試釋放資源 3 if (tryRelease(arg)) { 4 Node h = head; 5 if (h != null && h.waitStatus != 0) 6 unparkSuccessor(h); // 喚醒隊列中下一個線程 7 return true; 8 } 9 return false; 10 }
釋放鎖的函數很簡單,經過tryRelease嘗試釋放資源,而後喚醒隊列中的其餘線程。
tryRelease(int):
1 protected boolean tryRelease(int arg) { 2 throw new UnsupportedOperationException(); 3 }
與tryAcquire函數同樣,該方法須要子類去實現,若是直接調用會拋異常。
unparkSuccessor(Node node):
喚醒等待隊列中的下一個線程,這裏喚醒的是等待隊列中最前邊那個未放棄的線程,注意看代碼註釋。
1 private void unparkSuccessor(Node node) { 2 /* 3 * If status is negative (i.e., possibly needing signal) try 4 * to clear in anticipation of signalling. It is OK if this 5 * fails or if status is changed by waiting thread. 6 */ 7 int ws = node.waitStatus; // 獲取當前線程的狀態 8 if (ws < 0) // 若是當前線程狀態處於可用狀態,則直接將狀態值置0 9 compareAndSetWaitStatus(node, ws, 0); 10 11 /* 12 * Thread to unpark is held in successor, which is normally 13 * just the next node. But if cancelled or apparently null, 14 * traverse backwards from tail to find the actual 15 * non-cancelled successor. 16 */ 17 Node s = node.next; // 下一個節點 18 if (s == null || s.waitStatus > 0) { // 若是節點爲null或節點已處於結束狀態 19 s = null; 20 // 從隊列尾向前遍歷,找到next可用的節點,狀態小於0就可用,這裏的節點是隊列中最前邊的可用節點 21 for (Node t = tail; t != null && t != node; t = t.prev) 22 if (t.waitStatus <= 0) 23 s = t; 24 } 25 if (s != null) 26 LockSupport.unpark(s.thread);// 喚醒next線程 27 }
獨佔模式的主要函數分析完畢,接下來看共享模式。
②acquireShared(int)
共享模式下獲取資源,若是成功則直接返回,不然進入等待隊列,經過自旋直到獲取資源爲止。
1 public final void acquireShared(int arg) { 2 // 共享模式下獲取資源,若是獲取失敗,則進入等待隊列 3 // 一樣該函數須要子類去實現 4 if (tryAcquireShared(arg) < 0) 5 doAcquireShared(arg); // 進入等待隊列直到鎖獲取到爲止 6 }
tryAcquireShared(int)函數返回值,須要注意下:
負數:表示獲取失敗;
0:獲取成功,但沒有剩餘資源;
正數:獲取成功,且有剩餘資源;
#1.doAcquireShared(int)
將線程加入隊列尾,而後經過自旋獲取資源,直到獲得資源才返回。
1 private void doAcquireShared(int arg) { 2 final Node node = addWaiter(Node.SHARED); // 將線程加入隊尾,經過共享模式 3 boolean failed = true;// 是否成功 4 try { 5 boolean interrupted = false; // 在自旋過程當中是否被中斷過 6 for (;;) { 7 final Node p = node.predecessor(); // 前驅節點 8 if (p == head) { // 這裏代表當前節點處於head的next位,此時node被喚醒,極可能是head用完來喚醒 9 int r = tryAcquireShared(arg); // 獲取資源 10 if (r >= 0) { // 成功 11 setHeadAndPropagate(node, r);// 將head指向本身,還有剩餘資源可用的話再喚醒以後的線程 12 p.next = null; // help GC 無用鏈,幫助GC 13 if (interrupted) // 若是等待過程當中被中斷過,將中斷補上 14 selfInterrupt(); 15 failed = false; 16 return; 17 } 18 } 19 // 線程未排在head以後,繼續排隊,進入waiting狀態,等着unpark 20 if (shouldParkAfterFailedAcquire(p, node) && 21 parkAndCheckInterrupt()) 22 interrupted = true; // 中斷標記 23 } 24 } finally { 25 if (failed) 26 cancelAcquire(node); 27 } 28 }
整個流程與獨佔模式的acquireQueued很類似,只是共享模式下,在喚醒本身後,若是還有剩餘資源,須要喚醒後續節點。
setHeadAndPropagate(node, int)
將head節點設置爲當前節點,若是還有剩餘資源,則喚醒下一個線程。
1 private void setHeadAndPropagate(Node node, int propagate) { 2 Node h = head; // Record old head for check below 3 setHead(node); // 將隊列中的head執行當前節點 4 /* 5 * Try to signal next queued node if: 6 * Propagation was indicated by caller, 7 * or was recorded (as h.waitStatus either before 8 * or after setHead) by a previous operation 9 * (note: this uses sign-check of waitStatus because 10 * PROPAGATE status may transition to SIGNAL.) 11 * and 12 * The next node is waiting in shared mode, 13 * or we don't know, because it appears null 14 * 15 * The conservatism in both of these checks may cause 16 * unnecessary wake-ups, but only when there are multiple 17 * racing acquires/releases, so most need signals now or soon 18 * anyway. 19 */ 20 // 若是還有剩餘資源,則喚醒後續線程 21 if (propagate > 0 || h == null || h.waitStatus < 0 || 22 (h = head) == null || h.waitStatus < 0) { 23 Node s = node.next; 24 if (s == null || s.isShared()) 25 doReleaseShared(); 26 } 27 }
這裏除了將head設置成當前線程,若是有剩餘資源,須要喚醒後續節點。
doReleaseShared()
1 private void doReleaseShared() { 2 /* 3 * Ensure that a release propagates, even if there are other 4 * in-progress acquires/releases. This proceeds in the usual 5 * way of trying to unparkSuccessor of head if it needs 6 * signal. But if it does not, status is set to PROPAGATE to 7 * ensure that upon release, propagation continues. 8 * Additionally, we must loop in case a new node is added 9 * while we are doing this. Also, unlike other uses of 10 * unparkSuccessor, we need to know if CAS to reset status 11 * fails, if so rechecking. 12 */ 13 // 自旋操做 14 for (;;) { 15 Node h = head; 16 if (h != null && h != tail) { 17 int ws = h.waitStatus; 18 if (ws == Node.SIGNAL) { // 若是head狀態爲SIGNAL,則需喚醒後續節點 19 // CAS一下當前節點的狀態,判斷是否爲SIGNAL,若是是則置爲0,不然繼續循環 20 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) 21 continue; // loop to recheck cases 22 unparkSuccessor(h); // 喚醒後繼節點 23 } 24 // 若是head節點狀態爲0,且CAS置爲傳播狀態失敗,則繼續循環,由於if操做中會改變節點的狀態 25 else if (ws == 0 && 26 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) 27 continue; // loop on failed CAS 28 } 29 if (h == head) // 若是head節點發生了改變,則繼續自旋操做,防止上述操做過程當中添加了節點的狀況 // loop if head changed 30 break; 31 } 32 }
該方法的做用主要是用於喚醒後續節點。
共享模式獲取鎖操做與獨佔模式基本相同:先直接獲取資源,若是成功,直接返回;若是失敗,則將線程加入等待隊列尾,直到獲取到資源才返回,整個過程忽略中斷。不一樣點在於共享模式下本身拿到資源後,還須要喚醒後續節點。
#2.releaseShared(int)
同享模式下釋放資源
1 public final boolean releaseShared(int arg) { 2 if (tryReleaseShared(arg)) { // 嘗試釋放資源 3 doReleaseShared(); // 喚醒後續節點,前面已經分析 4 return true; 5 } 6 return false; 7 }
共享模式釋放資源與獨佔模式相似,可是獨佔模式下須要徹底釋放資源後,纔會返回true,而共享模式沒有這種要求。
這裏只是對AQS的頂層框架進行了簡要的分析,具體須要深刻其子類中去,AQS的子類按模式分類可聚合成如下幾類:
#1.獨佔模式:
ReentrantLock:可重入鎖。state=0獨佔鎖,或者同一線程可屢次獲取鎖(獲取+1,釋放-1)。
Worker(java.util.concurrent.ThreadPoolExecutor類中的內部類)線程池類。shutdown關閉空閒工做線程,中斷worker工做線程是獨佔的,互斥的。
#2.共享模式:
Semaphore:信號量。 控制同時有多少個線程能夠進入代碼段。(互斥鎖的拓展)
CountDownLatch:倒計時器。 初始化一個值,多線程減小這個值,直到爲0,倒計時完畢,執行後續代碼。
#3.獨佔+共享模式:
ReentrantReadWriteLock:可重入讀寫鎖。獨佔寫+共享讀,即併發讀,互斥寫。
後續對這些類進行詳細分析。
by Shawn Chen,2019.1.29日,下午。