AbstractQueuedSynchronizer簡稱AQS是一個抽象同步框架,能夠用來實現一個依賴狀態的同步器。
JDK1.5中提供的java.util.concurrent包中的大多數的同步器(Synchronizer)如Lock, Semaphore, Latch, Barrier等,這些類之間大多能夠互相實現,如使用Lock實現一個Semaphore或者反過來,可是它們都是基於java.util.concurrent.locks.AbstractQueuedSynchronizer
這個類的框架實現的,理解了這個稍微複雜抽象的類再去理解其餘的同步器就很輕鬆了。java
AQS的核心是一個線程等待隊列,採用的是一個先進先出FIFO隊列。用來實現一個非阻塞的同步器隊列有主要有兩個選擇Mellor-Crummey and Scott (MCS) locks和Craig, Landin, and Hagersten (CLH) locks的變種。CLH鎖更適合處理取消和超時,因此AQS基於CLH進行修改做爲線程等待隊列。
CLH隊列使用pred引用前節點造成一個隊列,入隊enqueue和出隊dequeue操做均可以經過原子操做完成。node
在 AQS 內部,經過維護一個FIFO 隊列
來管理多線程的排隊工做。在公平競爭的狀況下,沒法獲取同步狀態的線程將會被封裝成一個節點,置於隊列尾部。入隊的線程將會經過自旋的方式獲取同步狀態,若在有限次的嘗試後,仍未獲取成功,線程則會被阻塞住。大體示意圖以下:算法
當頭結點釋放同步狀態後,且後繼節點對應的線程被阻塞,此時頭結點線程將會去喚醒後繼節點線程。後繼節點線程恢復運行並獲取同步狀態後,會將舊的頭結點從隊列中移除,並將本身設爲頭結點。大體示意圖以下:多線程
其中每一個節點包含以下狀態:app
/** waitStatus value to indicate thread has cancelled */ static final int CANCELLED = 1; /** waitStatus value to indicate successor's thread needs unparking */ static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ static final int PROPAGATE = -3;
/**
* Status field, taking on only the values:
* SIGNAL: The successor of this node is (or will soon be)
* blocked (via park), so the current node must
* unpark its successor when it releases or
* cancels. To avoid races, acquire methods must
* first indicate they need a signal,
* then retry the atomic acquire, and then,
* on failure, block.
* CANCELLED: This node is cancelled due to timeout or interrupt.
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
* CONDITION: This node is currently on a condition queue.
* It will not be used as a sync queue node
* until transferred, at which time the status
* will be set to 0. (Use of this value here has
* nothing to do with the other uses of the
* field, but simplifies mechanics.)
* PROPAGATE: A releaseShared should be propagated to other
* nodes. This is set (for head node only) in
* doReleaseShared to ensure propagation
* continues, even if other operations have
* since intervened.
* 0: None of the above
*
* The values are arranged numerically to simplify use.
* Non-negative values mean that a node doesn't need to
* signal. So, most code doesn't need to check for particular
* values, just for sign.
*
* The field is initialized to 0 for normal sync nodes, and
* CONDITION for condition nodes. It is modified using CAS
* (or when possible, unconditional volatile writes).
*/
CANCELED表示線程等待已經取消,是惟一一個大於0的狀態。
SINALG表示須要喚醒next節點
CONDITION代表線程正在等待一個條件
PROPAGATE用於acquireShared中向後傳播
/** * Acquires in exclusive mode, ignoring interrupts. Implemented * by invoking at least once {@link #tryAcquire}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquire} until success. This method can be used * to implement method {@link Lock#lock}. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. */ public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
一、若是嘗試獲取鎖成功整個獲取操做就結束,不然轉到2. 嘗試獲取鎖是經過方法tryAcquire來實現的,AQS中並無該方法的具體實現,只是簡單地拋出一個不支持操做異常,在AQS簡介中談到tryAcquire有不少實現方法,這裏再也不細化,只須要知道若是獲取鎖成功該方法返回true便可;框架
二、若是獲取鎖失敗,那麼就建立一個表明當前線程的結點加入到等待隊列的尾部,是經過addWaiter方法實現的,來看該方法的具體實現:oop
/** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */ private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
該方法建立了一個獨佔式結點,而後判斷隊列中是否有元素,若是有(pred!=null)就設置當前結點爲隊尾結點,即將當前節點插入到尾節點的後面,而後返回;ui
若是沒有元素(pred==null),表示隊列爲空,走的是入隊操做this
/** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node's predecessor */ private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
enq方法採用的是變種CLH算法,先看頭結點是否爲空,若是爲空就建立一個傀儡結點,頭尾指針都指向這個傀儡結點,這一步只會在隊列初始化時會執行;atom
若是頭結點非空,就採用CAS操做將當前結點插入到頭結點後面,若是在插入的時候尾結點有變化,就將尾結點向後移動直到移動到最後一個結點爲止,而後再把當前結點插入到尾結點後面,尾指針指向當前結點,入隊成功。
三、將新加入的結點放入隊列以後,這個結點有兩種狀態,要麼獲取鎖,要麼就掛起,若是這個結點不是頭結點的後繼節點,就看看這個結點是否應該掛起,若是應該掛起,就掛起當前結點,是否應該掛起是經過shouldParkAfterFailedAcquire方法來判斷的
/** * Acquires in exclusive uninterruptible mode for thread already in * queue. Used by condition wait methods as well as acquire. * * @param node the node * @param arg the acquire argument * @return {@code true} if interrupted while waiting */ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); //判斷前驅節點是不是頭節點,若是是,則循環獲取同步 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } //若是獲取同步狀態失敗或者前驅節點不是頭節點,則判斷是否將當前節點掛起 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
/** * Checks and updates status for a node that failed to acquire. * Returns true if thread should block. This is the main signal * control in all acquire loops. Requires that pred == node.prev * * @param pred node's predecessor holding status * @param node the node * @return {@code true} if thread should block */ /** * 該方法主要用途是,當線程在獲取同步狀態失敗時,根據前驅節點的等待狀態,決定後續的動做。好比前驅 * 節點等待狀態爲 SIGNAL,代表當前節點線程應該被阻塞住了。不能總是嘗試,避免 CPU 忙等。 * ————————————————————————————————————————————————————————————————— * | 前驅節點等待狀態 | 相應動做 | * ————————————————————————————————————————————————————————————————— * | SIGNAL | 阻塞 | * | CANCELLED | 向前遍歷, 移除前面全部爲該狀態的節點 | * | waitStatus < 0 | 將前驅節點狀態設爲 SIGNAL, 並再次嘗試獲取同步狀態 | * ————————————————————————————————————————————————————————————————— */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; //首先檢查前趨結點的waitStatus位,若是爲SIGNAL,表示前趨結點會通知它,那麼它能夠放心大膽地掛起了 if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ //若是前趨結點是一個被取消的結點怎麼辦呢?那麼就向前遍歷跳過被取消的結點,直到找到一個沒有被取消的結點爲止,將找到的這個結點做爲它的前趨結點,
//將找到的這個結點的waitStatus位設置爲SIGNAL,返回false表示線程不該該被掛起,繼續嘗試獲取鎖 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ /* * 等待狀態爲 0 或 PROPAGATE,設置前驅節點等待狀態爲 SIGNAL, * 並再次嘗試獲取同步狀態。 */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
private final boolean parkAndCheckInterrupt() { // 調用 LockSupport.park 阻塞本身 LockSupport.park(this); return Thread.interrupted(); }
/** * Releases in exclusive mode. Implemented by unblocking one or * more threads if {@link #tryRelease} returns true. * This method can be used to implement method {@link Lock#unlock}. * * @param arg the release argument. This value is conveyed to * {@link #tryRelease} but is otherwise uninterpreted and * can represent anything you like. * @return the value returned from {@link #tryRelease} */ public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
一、release過程比acquire要簡單,首先調用tryRelease釋放鎖,若是釋放失敗,直接返回;
二、釋放鎖成功後須要喚醒繼任結點,是經過方法unparkSuccessor實現的
/** * Wakes up node's successor, if one exists. * * @param node the node */ private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
一、node參數傳進來的是頭結點,首先檢查頭結點的waitStatus位,若是爲負,表示頭結點還須要通知後繼結點,這裏不須要頭結點去通知後繼,所以將該該標誌位清0.
二、而後查看頭結點的下一個結點,若是下一個結點不爲空且它的waitStatus<=0,表示後繼結點沒有被取消,是一個能夠喚醒的結點,因而喚醒後繼結點返回;若是後繼結點爲空或者被取消了怎麼辦?尋找下一個可喚醒的結點,而後喚醒它返回。
這裏並無從頭向尾尋找,從隊列尾部開始向前查找,找到隊列最前面沒有被取消的節點,而後,將其喚醒。
爲何須要從隊列尾部開始向前查找呢?
由於在CLH隊列中的結點隨時有可能被中斷,被中斷的結點的waitStatus設置爲CANCEL,並且它會被踢出CLH隊列,如何個踢出法,就是它的前趨結點的next並不會指向它,而是指向下一個非CANCEL的結點,而它本身的next指針指向它本身。一旦這種狀況發生,如何從頭向尾方向尋找繼任結點會出現問題,由於一個CANCEL結點的next爲本身,那麼就找不到正確的繼任接點。
有的人又會問了,CANCEL結點的next指針爲何要指向它本身,爲何不指向真正的next結點?爲何不爲NULL?
第一個問題的答案是這種被CANCEL的結點最終會被GC回收,若是指向next結點,GC沒法回收。