它維護了一個volatile int state(表明共享資源)和一個FIFO線程等待隊列(多線程爭用資源被阻塞時會進入此隊列,有個內部類Node定義了節點。隊列由AQS的volatile成員變量head和tail組成一個雙向鏈表)node
1.1 acquire(int)app
/** * Acquires in exclusive mode, ignoring interrupts. Implemented * by invoking at least once {@link #tryAcquire}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquire} until success. This method can be used * to implement method {@link Lock#lock}. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. */ public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
1. tryAcquire():嘗試獲取資源。 2. addWaiter(Node.EXCLUSIVE):獲取資源失敗,將該線程加入等待隊列尾部,標記爲獨佔模式。 3. acquireQueued(Node,int):獲取該node指定數量的資源數,會一直等待成功獲取才返回,返回值是在獲取期間是否中斷過
1. tryAcquire()高併發
/** * Attempts to acquire in exclusive mode. This method should query * if the state of the object permits it to be acquired in the * exclusive mode, and if so to acquire it. * * <p>This method is always invoked by the thread performing * acquire. If this method reports failure, the acquire method * may queue the thread, if it is not already queued, until it is * signalled by a release from some other thread. This can be used * to implement method {@link Lock#tryLock()}. * * <p>The default * implementation throws {@link UnsupportedOperationException}. * * @param arg the acquire argument. This value is always the one * passed to an acquire method, or is the value saved on entry * to a condition wait. The value is otherwise uninterpreted * and can represent anything you like. * @return {@code true} if successful. Upon success, this object has * been acquired. * @throws IllegalMonitorStateException if acquiring would place this * synchronizer in an illegal state. This exception must be * thrown in a consistent fashion for synchronization to work * correctly. * @throws UnsupportedOperationException if exclusive mode is not supported */ protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
2. addWaiter(Node)oop
/** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */ private Node addWaiter(Node mode) { //新建Node Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure //快速嘗試一次,使用CAS將node放到隊尾,失敗調用enq Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { = node; return node; } } //保證將Node放入隊尾 enq(node); return node; }
/** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node's predecessor */ private Node enq(final Node node) { for (;;) { Node t = tail; //若是尾節點爲空,說明隊列還未進行初始化 if (t == null) { // Must initialize //CAS設置頭結點 if (compareAndSetHead(new Node())) //初始頭尾相同,從下一次循環開始嘗試加入新Node tail = head; } else { node.prev = t; //CAS將當前節點設置爲尾節點 if (compareAndSetTail(t, node)) { //設置成功返回當前節點 = node; return t; } } } }
3. acquireQueued(Node, int)
/** * Acquires in exclusive uninterruptible mode for thread already in * queue. Used by condition wait methods as well as acquire. * * @param node the node * @param arg the acquire argument * @return {@code true} if interrupted while waiting */ final boolean acquireQueued(final Node node, int arg) { //標誌是否成功獲取資源 boolean failed = true; try { //是否被中斷 boolean interrupted = false; for (;;) { //獲取前驅Node final Node p = node.predecessor(); //若是本身是隊列中第二個節點,那會進行嘗試獲取,進入這裏判斷要麼是一次,要麼是被前驅節點給unPark喚醒了。 if (p == head && tryAcquire(arg)) { //成功獲取資源,設置自身爲頭節點,將原來的頭結點剝離隊列 setHead(node); = null; // help GC failed = false; return interrupted; } //判斷是否須要被park,若是須要進行park並檢測是否被中斷 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { //若是獲取資源失敗了將當前node取消, if (failed) cancelAcquire(node); } }
/** * Checks and updates status for a node that failed to acquire. * Returns true if thread should block. This is the main signal * control in all acquire loops. Requires that pred == node.prev. * * @param pred node's predecessor holding status * @param node the node * @return {@code true} if thread should block */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; //若是前驅的狀態已是signal,表明前驅釋放是會通知喚醒你,那麼此node能夠安心被park if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ //若是前驅已經被取消,那麼從當前node一直往前找,直到有非取消的node,直接排在它的後面,此時不須要park,會出去再嘗試一次獲取資源。 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ //前驅節點沒有被取消,那麼告訴前驅節點釋放的時候通知本身 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
/** * Convenience method to park and then check if interrupted * * @return {@code true} if interrupted */ private final boolean parkAndCheckInterrupt() { //讓該線程進入wait狀態 LockSupport.park(this); //返回期間是被中斷過 return Thread.interrupted(); }
2.找到「有效」(not canceled)的前驅,並通知前驅釋放了要「通知」(watiStatus=signal)我,安心被park。
1.2 release(int)
/** * Releases in exclusive mode. Implemented by unblocking one or * more threads if {@link #tryRelease} returns true. * This method can be used to implement method {@link Lock#unlock}. * * @param arg the release argument. This value is conveyed to * {@link #tryRelease} but is otherwise uninterpreted and * can represent anything you like. * @return the value returned from {@link #tryRelease} */ public final boolean release(int arg) { //調用實現者的嘗試解鎖方法,由於已經得到鎖,因此基本不會失敗 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) //喚醒下一個節點 unparkSuccessor(h); return true; } return false; }
/** * Wakes up node's successor, if one exists. * * @param node the node */ private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0)//設置當前節點的狀態容許失敗,失敗了也不要緊。 compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ //找到下一個須要被喚醒的節點 Node s =; if (s == null || s.waitStatus > 0) {//若是是空或被取消 s = null; //從尾節點開始尋找,直到找到最前面的有效的節點。由於鎖已經釋放,因此從尾節點開始找能夠避免由於高併發下複雜的隊列動態變化帶來的邏輯判斷, for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
1.3 acquireShared(int)
/** * Acquires in shared mode, ignoring interrupts. Implemented by * first invoking at least once {@link #tryAcquireShared}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquireShared} until success. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquireShared} but is otherwise uninterpreted * and can represent anything you like. */ public final void acquireShared(int arg) { //嘗試獲取指定數量資源 if (tryAcquireShared(arg) < 0) //獲取資源直到成功 doAcquireShared(arg); }
/** * Attempts to acquire in shared mode. This method should query if * the state of the object permits it to be acquired in the shared * mode, and if so to acquire it. * * <p>This method is always invoked by the thread performing * acquire. If this method reports failure, the acquire method * may queue the thread, if it is not already queued, until it is * signalled by a release from some other thread. * * <p>The default implementation throws {@link * UnsupportedOperationException}. * * @param arg the acquire argument. This value is always the one * passed to an acquire method, or is the value saved on entry * to a condition wait. The value is otherwise uninterpreted * and can represent anything you like. * @return a negative value on failure; zero if acquisition in shared * mode succeeded but no subsequent shared-mode acquire can * succeed; and a positive value if acquisition in shared * mode succeeded and subsequent shared-mode acquires might * also succeed, in which case a subsequent waiting thread * must check availability. (Support for three different * return values enables this method to be used in contexts * where acquires only sometimes act exclusively.) Upon * success, this object has been acquired. * @throws IllegalMonitorStateException if acquiring would place this * synchronizer in an illegal state. This exception must be * thrown in a consistent fashion for synchronization to work * correctly. * @throws UnsupportedOperationException if shared mode is not supported */ protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); }
/** * Acquires in shared uninterruptible mode. * @param arg the acquire argument */ private void doAcquireShared(int arg) { //添加當前線程的Node模式爲共享模式至隊尾, final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { //獲取前驅節點 final Node p = node.predecessor(); //若是本身是老二纔有嘗試的資格 if (p == head) { //嘗試獲取指定數量資源 int r = tryAcquireShared(arg); if (r >= 0) { //若是成功獲取,將當前節點設置爲頭節點,若是有剩餘資源喚醒下一有效節點 setHeadAndPropagate(node, r); = null; // help GC //若是有中斷,本身補償中斷 if (interrupted) selfInterrupt(); failed = false; return; } } //判斷是否須要被park,和park後檢查是否被中弄斷 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { //若是獲取失敗,取消當前節點 if (failed) cancelAcquire(node); } }
/** * Sets head of queue, and checks if successor may be waiting * in shared mode, if so propagating if either propagate > 0 or * PROPAGATE status was set. * * @param node the node * @param propagate the return value from a tryAcquireShared */ private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below //將當前節點設置爲head節點 setHead(node); /* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either before * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don't know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */ //若是有剩餘資源 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s =; //當下一個有效節點存在且是共享模式時,會喚醒它 if (s == null || s.isShared()) doReleaseShared(); } }
/** * Release action for shared mode -- signals successor and ensures * propagation. (Note: For exclusive mode, release just amounts * to calling unparkSuccessor of head if it needs signal.) */ private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; //若是頭結點狀態是「通知後繼」 if (ws == Node.SIGNAL) { //將其狀態改成0,表示已通知 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases //喚醒後繼 unparkSuccessor(h); } //若是已通知後繼,則改成可傳播,在下次acquire中的shouldParkAfterFailedAcquire會將改成SIGNAL else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } //若是頭結點變了,再次循環 if (h == head) // loop if head changed break; } }
1.4 releaseShared(int)
/** * Releases in shared mode. Implemented by unblocking one or more * threads if {@link #tryReleaseShared} returns true. * * @param arg the release argument. This value is conveyed to * {@link #tryReleaseShared} but is otherwise uninterpreted * and can represent anything you like. * @return the value returned from {@link #tryReleaseShared} */ public final boolean releaseShared(int arg) { //嘗試共享模式獲取資源 if (tryReleaseShared(arg)) { //喚醒下一節點 doReleaseShared(); return true; } return false; }