1.什麼是AQS?html
AQS的核心思想是基於volatile int state這樣的volatile變量,配合Unsafe工具對其原子性的操做來實現對當前鎖狀態進行修改。同步器內部依賴一個FIFO的雙向隊列來完成資源獲取線程的排隊工做。java
2.同步器的應用node
同步器主要使用方式是繼承,子類經過繼承同步器並實現它的抽象方法來管理同步狀態,對同步狀態的修改或者訪問主要經過同步器提供的3個方法:安全
同步器能夠支持獨佔式的獲取同步狀態,也能夠支持共享式的獲取同步狀態,這樣能夠方便實現不一樣類型的同步組件。數據結構
同步器也是實現鎖的關鍵,在鎖的實現中聚合同步器,利用同步器實現鎖的語義。架構
3.AQS同步隊列併發
同步器AQS內部的實現是依賴同步隊列(一個FIFO的雙向隊列,其實就是數據結構雙向鏈表)來完成同步狀態的管理。app
當前線程獲取同步狀態失敗時,同步器AQS會將當前線程和等待狀態等信息構形成爲一個節點(node)加入到同步隊列,同時會阻塞當前線程;less
當同步狀態釋放的時候,會把首節點中的線程喚醒,使首節點的線程再次嘗試獲取同步狀態。AQS是獨佔鎖和共享鎖的實現的父類。 ide
4.AQS鎖的類別:分爲獨佔鎖和共享鎖兩種。
JUC包中的鎖的包括:Lock接口,ReadWriteLock接口;Condition條件,LockSupport阻塞原語。
AbstractOwnableSynchronizer/AbstractQueuedSynchronizer/AbstractQueuedLongSynchronizer三個抽象類,
ReentrantLock獨佔鎖,ReentrantReadWriteLock讀寫鎖。CountDownLatch,CyclicBarrier和Semaphore也是經過AQS來實現的。
下面是AQS和使用AQS實現的一些鎖,以及經過AQS實現的一些工具類的架構圖:
圖 1.依賴AQS實現的鎖和工具類
5.AQS同步器的結構:同步器擁有首節點(head)和尾節點(tail)。同步隊列的基本結構以下:
圖 1.同步隊列的基本結構 compareAndSetTail(Node expect,Node update)
圖 2.尾節點的設置 節點加入到同步隊列
圖 3.首節點的設置
6.獨佔式的鎖的獲取:調用同步器的acquire(int arg)方法能夠獲取同步狀態,該方法對中斷不敏感,即線程獲取同步狀態失敗後進入同步隊列,後續對線程進行中斷操做時,線程不會從同步隊列中移除。
(1) 當前線程實現經過tryAcquire()方法嘗試獲取鎖,獲取成功的話直接返回,若是嘗試失敗的話,進入等待隊列排隊等待,能夠保證線程安全(CAS)的獲取同步狀態。
(2) 若是嘗試獲取鎖失敗的話,構造同步節點(獨佔式的Node.EXCLUSIVE),經過addWaiter(Node node,int args)方法,將節點加入到同步隊列的隊列尾部。
(3) 最後調用acquireQueued(final Node node, int args)方法,使該節點以死循環的方式獲取同步狀態,若是獲取不到,則阻塞節點中的線程。acquireQueued方法當前線程在死循環中獲取同步狀態,而只有前驅節點是頭節點的時候才能嘗試獲取鎖(同步狀態)( p == head && tryAcquire(arg))。
緣由是:1.頭結點是成功獲取同步狀態的節點,而頭結點的線程釋放鎖之後,將喚醒後繼節點,後繼節點線程被喚醒後要檢查本身的前驅節點是否爲頭結點。
2.維護同步隊列的FIFO原則,節點進入同步隊列之後,就進入了一個自旋的過程,每一個節點(後者說每一個線程)都在自省的觀察。
下圖爲節點自旋檢查本身的前驅節點是否爲頭結點:
圖 4 節點自旋獲取同步狀態
獨佔式的鎖的獲取源碼:
acquire方法源碼以下
/** * Acquires in exclusive(互斥) mode, ignoring(忽視) interrupts. Implemented * by invoking at least once {@link #tryAcquire}, * returning on success. Otherwise the thread is queued(排隊), possibly * repeatedly(反覆的) blocking and unblocking, invoking {@link * #tryAcquire} until success. This method can be used * to implement method {@link Lock#lock}. * * @param arg the acquire argument. This value is conveyed(傳達) to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. * * 獨佔式的獲取同步狀態 * */ public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
嘗試獲取鎖:tryAcquire方法:若是獲取到了鎖,tryAcquire返回true,反之,返回false。
//方法2: protected final boolean tryAcquire(int acquires) { // 獲取當前線程 final Thread current = Thread.currentThread(); // 獲取「獨佔鎖」的狀態,獲取父類AQS的標誌位 int c = getState(); //c == 0 意思是鎖(同步狀態)沒有被任何線程所獲取 //1.當前線程是不是同步隊列中頭結點Node,若是是的話,則獲取該鎖,設置鎖的狀態,並設置鎖的擁有者爲當前線程 if (c == 0) { if (!hasQueuedPredecessors() &&
// 修改下狀態爲,這裏的acquires的值是1,是寫死的調用子類的lock的方法的時候傳進來的,若是c == 0,compareAndSetState操做會更新成功爲1. compareAndSetState(0, acquires)) {
// 上面CAS操做更新成功爲1,表示當前線程獲取到了鎖,由於將當前線程設置爲AQS的一個變量中,表明這個線程拿走了鎖。 setExclusiveOwnerThread(current); return true; } } //2.若是c不爲0,即狀態不爲0,表示鎖已經被拿走。
//由於ReetrantLock是可重入鎖,是能夠重複lock和unlock的,因此這裏還要判斷一次,獲取鎖的線程是否爲當前請求鎖的線程。 else if (current == getExclusiveOwnerThread()) {
//若是是,state繼續加1,這裏nextc的結果就會 > 1,這個判斷表示獲取到的鎖的線程,還能夠再獲取鎖,這裏就是說的可重入的意思 int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
addWaiter方法的源碼:回到aquire方法,若是嘗試獲取同步狀態(鎖)失敗的話,則構造同步節點(獨佔式的Node.EXCLUSIVE),
經過addWaiter(Node node,int args)方法
將該節點加入到同步隊列的隊尾。
/** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node * * * 若是嘗試獲取同步狀態失敗的話,則構造同步節點(獨佔式的Node.EXCLUSIVE),經過addWaiter(Node node,int args)方法將該節點加入到同步隊列的隊尾。 * */ private Node addWaiter(Node mode) {
// 用當前線程夠着一個Node對象,mode是一個表示Node類型的字段,或者說是這個節點是獨佔的仍是共享的,或者說AQS的這個隊列中,哪些節點是獨佔的,哪些節點是共享的。 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail;
//隊列不爲空的時候 if (pred != null) { node.prev = pred; // 確保節點可以被線程安全的添加,使用CAS方法
// 嘗試修改成節點爲最新的節點,若是修改失敗,意味着有併發,這個時候進入enq中的死循環,進行「自旋」的方式修改 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } }
//進入自旋 enq(node); return node; }
enq方法的源碼:同步器經過死循環的方式來保證節點的正確添加,在「死循環」 中經過CAS將節點設置成爲尾節點以後,當前線程才能從該方法中返回,不然
當前線程不斷的嘗試設置。
enq方法將併發添加節點的請求經過CAS變得「串行化」了。
/** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node's predecessor * * 同步器經過死循環的方式來保證節點的正確添加,在「死循環」 中經過CAS將節點設置成爲尾節點以後,當前線程才能從該方法中返回,不然當前線程不斷的嘗試設置。 * enq方法將併發添加節點的請求經過CAS變得「串行化」了。 * */ private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
acquireQueued方法:在隊列中的線程獲取鎖的過程:
/** * Acquires in exclusive uninterruptible mode for thread already in * queue. Used by condition wait methods as well as acquire. * * @param node the node * @param arg the acquire argument * @return {@code true} if interrupted while waiting * * acquireQueued方法當前線程在死循環中獲取同步狀態,而只有前驅節點是頭節點才能嘗試獲取同步狀態(鎖)( p == head && tryAcquire(arg)) * 緣由是:1.頭結點是成功獲取同步狀態(鎖)的節點,而頭節點的線程釋放了同步狀態之後,將會喚醒其後繼節點,後繼節點的線程被喚醒後要檢查本身的前驅節點是否爲頭結點。 * 2.維護同步隊列的FIFO原則,節點進入同步隊列以後,就進入了一個自旋的過程,每一個節點(或者說是每一個線程)都在自省的觀察。 * */ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false;
//死循環檢查(自旋檢查)當前節點的前驅節點是否爲頭結點,才能獲取鎖 for (;;) {
// 獲取節點的前驅節點 final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) {//節點中的線程循環的檢查,本身的前驅節點是否爲頭節點
//將當前節點設置爲頭結點,移除以前的頭節點 setHead(node); p.next = null; // help GC failed = false; return interrupted; }
// 不然檢查前一個節點的狀態,看當前獲取鎖失敗的線程是否要掛起 if (shouldParkAfterFailedAcquire(p, node) &&
//若是須要掛起,藉助JUC包下面的LockSupport類的靜態方法park掛起當前線程,直到被喚醒
parkAndCheckInterrupt()) interrupted = true; } } finally {
//若是有異常 if (failed)
//取消請求,將當前節點從隊列中移除 cancelAcquire(node); } }
獨佔式的獲取同步狀態的流程以下:
圖5 獨佔式的獲取同步狀態的流程
7.獨佔鎖的釋放:下面直接看源碼:
/*
1. unlock():unlock()是解鎖函數,它是經過AQS的release()函數來實現的。 * 在這裏,「1」的含義和「獲取鎖的函數acquire(1)的含義」同樣,它是設置「釋放鎖的狀態」的參數。 * 因爲「公平鎖」是可重入的,因此對於同一個線程,每釋放鎖一次,鎖的狀態-1。 unlock()在ReentrantLock.java中實現的,源碼以下: */ public void unlock() { sync.release(1); }
release()會調用tryRelease方法嘗試釋放當前線程持有的鎖(同步狀態),成功的話喚醒後繼線程,並返回true,不然直接返回false
/** * Releases in exclusive mode. Implemented by unblocking one or * more threads if {@link #tryRelease} returns true. * This method can be used to implement method {@link Lock#unlock}. * * @param arg the release argument. This value is conveyed to * {@link #tryRelease} but is otherwise uninterpreted and * can represent anything you like. * @return the value returned from {@link #tryRelease} * * * */ public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
// tryRelease() 嘗試釋放當前線程的同步狀態(鎖) protected final boolean tryRelease(int releases) { //c爲釋放後的同步狀態 int c = getState() - releases; //判斷當前釋放鎖的線程是否爲獲取到鎖(同步狀態)的線程,不是拋出異常(非法監視器狀態異常) if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; //若是鎖(同步狀態)已經被當前線程完全釋放,則設置鎖的持有者爲null,同步狀態(鎖)變的可獲取 if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
釋放鎖成功後,找到AQS的頭結點,並喚醒它便可:
// 4. 喚醒頭結點的後繼節點 private void unparkSuccessor(Node node) { //獲取頭結點(線程)的狀態 int ws = node.waitStatus; //若是狀態<0,設置當前線程對應的鎖的狀態爲0 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; //解釋:Thread to unpark is held in successor, which is normally just the next node. //But if cancelled or apparently(顯然) null, traverse backwards(向後遍歷) from tail to find the actual(實際的) non-cancelled successor(前繼節點). //從隊列尾部開始往前去找最前面的一個waitStatus小於0的節點。 if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } //喚醒後繼節點對應的線程 if (s != null) LockSupport.unpark(s.thread); }
上面說的是ReentrantLock的公平鎖獲取和釋放的AQS的源碼,惟獨還剩下一個非公平鎖NonfairSync沒說,其實,它和公平鎖的惟一區別就是獲取鎖的方式不一樣,公平鎖是按先後順序一次獲取鎖,非公平鎖是搶佔式的獲取鎖,那ReentrantLock中的非公平鎖NonfairSync是怎麼實現的呢?
/** * Sync object for non-fair locks */ static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }
非公平鎖的lock的時候多了上面加粗的代碼:在lock的時候先直接用cas判斷state變量是否爲0(嘗試獲取鎖),成功的話更新成1,表示當前線程獲取到了鎖,不須要在排隊,從而直接搶佔的目的。而對於公平鎖的lock方法是一開始就走AQS的雙向隊列排隊獲取鎖。更詳細的關於ReentrantLock的實現請看後面寫的一篇文章:http://www.cnblogs.com/200911/p/6035765.html
總結:在獲取同步狀態的時候,同步器維護一個同步隊列,獲取失敗的線程會被加入到隊列中並在隊列中自旋;移除隊列(或中止自旋)的條件是前驅節點爲頭結點而且獲取到了同步狀態。在釋放同步狀態時,同步器調用tryRelease(int args)方法釋放同步狀態,而後喚醒頭結點的後繼節點。AQS的實現思路其實並不複雜,用一句話準確的描述的話,其實就是使用標誌狀態位status(volatile int state)和 一個雙向隊列的入隊和出隊來實現。AQS維護一個線程什麼時候訪問的狀態,它只是對狀態負責,而這個狀態的含義,子類能夠本身去定義。
本身註釋的AQS的源碼:以下:
public class AbstractQueuedSynchronizerTest { /** * * (AQS節點的定義,同步隊列的節點定義) * * <p> * 修改歷史: <br> * 修改日期 修改人員 版本 修改內容<br> * -------------------------------------------------<br> * 2016年7月4日 上午10:26:38 user 1.0 初始化建立<br> * </p> * * @author Peng.Li * @version 1.0 * @since JDK1.7 */ static final class Node { /** Marker to indicate a node is waiting in shared mode */ static final Node SHARED = new Node(); /** Marker to indicate a node is waiting in exclusive mode * * */ static final Node EXCLUSIVE = null; /** waitStatus value to indicate thread has cancelled * 在同步隊列中等待的線程等待超時或者被中斷,須要從同步隊列中取消等待 * */ static final int CANCELLED = 1; /** waitStatus value to indicate successor's thread needs unparking(喚醒) * 後繼節點的線程處於等待狀態,而當前的節點若是釋放了同步狀態或者被取消,將會通知後繼節點,使後繼節點的線程得以運行。 **/ static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition * 節點在等待隊列中,節點的線程等待在Condition上,當其餘線程對Condition調用了signal()方法後,該節點會從等待隊列中轉移到同步隊列中,加入到同步狀態的獲取中 **/ static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally(無條件的) propagate(傳播) * * 表示下一次共享式同步狀態獲取將會被無條件地傳播下去 */ static final int PROPAGATE = -3; /** * Status field, taking on only the values: * SIGNAL: The successor of this node is (or will soon be) * blocked (via park), so the current node must * unpark its successor when it releases or * cancels. To avoid races, acquire methods must * first indicate they need a signal, * then retry the atomic acquire, and then, * on failure, block. * CANCELLED: This node is cancelled due to timeout or interrupt. * Nodes never leave this state. In particular, * a thread with cancelled node never again blocks. * CONDITION: This node is currently on a condition queue. * It will not be used as a sync queue node * until transferred, at which time the status * will be set to 0. (Use of this value here has * nothing to do with the other uses of the * field, but simplifies mechanics.) * PROPAGATE: A releaseShared should be propagated(傳播) to other * nodes. This is set (for head node only) in * doReleaseShared to ensure propagation * continues, even if other operations have * since intervened(干涉). * 0: None of the above * * The values are arranged numerically to simplify use. * Non-negative values mean that a node doesn't need to * signal. So, most code doesn't need to check for particular * values, just for sign. * * The field is initialized to 0 for normal sync nodes, and * CONDITION for condition nodes. It is modified using CAS * (or when possible, unconditional volatile writes). * * 使用CAS更改狀態,volatile保證線程可見性,即被一個線程修改後,狀態會立馬讓其餘線程可見。 * */ volatile int waitStatus; /** * Link to predecessor node that current node/thread relies on * for checking waitStatus. Assigned during enqueing(入隊), and nulled * out (for sake of GC) only upon dequeuing. Also, upon * cancellation of a predecessor, we short-circuit while * finding a non-cancelled one, which will always exist * because the head node is never cancelled: A node becomes * head only as a result of successful acquire. A * cancelled thread never succeeds in acquiring, and a thread only * cancels itself, not any other node. * * 前驅節點,當前節點加入到同步隊列中被設置 */ volatile Node prev; /** * Link to the successor node that the current node/thread * unparks upon release. Assigned during enqueuing, adjusted * when bypassing cancelled predecessors, and nulled out (for * sake of GC) when dequeued. The enq operation does not * assign next field of a predecessor until after attachment, * so seeing a null next field does not necessarily mean that * node is at end of queue. However, if a next field appears * to be null, we can scan prev's from the tail to * double-check. The next field of cancelled nodes is set to * point to the node itself instead of null, to make life * easier for isOnSyncQueue. * * 後繼節點 */ volatile Node next; /** * The thread that enqueued this node. Initialized on * construction and nulled out after use. * * 獲取同步狀態的線程 */ volatile Thread thread; /** * Link to next node waiting on condition, or the special * value SHARED. Because condition queues are accessed only * when holding in exclusive(獨有的) mode, we just need a simple * linked queue to hold nodes while they are waiting on * conditions. They are then transferred(移動到) to the queue(同步隊列) to * re-acquire. And because conditions can only be exclusive, * we save a field by using special value to indicate shared * mode. * * 等待隊列中的後繼節點,若是當前節點是共享的,那麼這個字段是一個SHARED常量, * 也就是說節點類型(獨佔和共享)和等待隊列中的後繼節點共用同一個字段。 */ Node nextWaiter; /** * Returns true if node is waiting in shared mode */ final boolean isShared() { return nextWaiter == SHARED; } /** * Returns previous node, or throws NullPointerException if null. * Use when predecessor cannot be null. The null check could * be elided, but is present to help the VM. * * @return the predecessor of this node */ final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } } /** * Head of the wait queue, lazily initialized. Except for (除...之外) * initialization(初始化), it is modified only via method setHead. Note: * If head exists, its waitStatus is guaranteed not to be * CANCELLED.(若是head引用已經存在,等待狀態保證不會被取消) */ private transient volatile Node head; /** * Tail of the wait queue(等待隊列), lazily initialized. Modified only via * method enq to add new wait node. */ private transient volatile Node tail; /** * The synchronization state. * 同步狀態,線程可見的,共享內存裏面保存 * */ private volatile int state; /** * Returns the current value of synchronization state. * This operation has memory semantics of a <tt>volatile</tt> read. * @return current state value * * 獲得同步狀態的值 * */ protected final int getState() { return state; } /** * Sets the value of synchronization state. * This operation has memory semantics of a <tt>volatile</tt> write. * @param newState the new state value */ protected final void setState(int newState) { state = newState; } /** * Acquires in exclusive(互斥) mode, ignoring(忽視) interrupts. Implemented * by invoking at least once {@link #tryAcquire}, * returning on success. Otherwise the thread is queued(排隊), possibly * repeatedly(反覆的) blocking and unblocking, invoking {@link * #tryAcquire} until success. This method can be used * to implement method {@link Lock#lock}. * * @param arg the acquire argument. This value is conveyed(傳達) to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. * * 獨佔式的獲取同步狀態 * */ public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } /** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node * * * 若是嘗試獲取同步狀態失敗的話,則構造同步節點(獨佔式的Node.EXCLUSIVE),經過 addWaiter(Node node,int args)方法將該節點加入到同步隊列的隊尾。 * */ private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; // 確保節點可以被安全的添加 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } /** * Convenience method to interrupt current thread. * 分析:若是在acquireQueued()中,當前線程被中斷過,則執行selfInterrupt();不然不會執行。 * 線程在阻塞狀態被「中斷喚醒」而獲取CPU的執行權;可是該線程前面還有其餘等待鎖的線程,根據公平性原則,該線程仍然沒法獲取到鎖,他會再次阻塞。 * 直到該線程被他前面等待鎖的線程喚醒;線程纔會獲取鎖。該線程「成功獲取鎖並真正執行起來以前」,他的中斷會被忽略而且中斷標記會被清除,由於在parkAndCheckInterrupt()中, * 咱們線程的中斷狀態時調用了Thread.interrupted(),這個函數在返回中斷狀態以後,還會清除中斷狀態,正由於清除了中斷狀態,因此在selfInterrupt從新產生一箇中斷。 * * * 當前線程本身產生一箇中斷 */ private static void selfInterrupt() { Thread.currentThread().interrupt(); } /** * Acquires in exclusive uninterruptible mode for thread already in * queue. Used by condition wait methods as well as acquire. * * @param node the node * @param arg the acquire argument * @return {@code true} if interrupted while waiting * * acquireQueued方法當前線程在死循環中獲取同步狀態,而只有前驅節點是頭節點才能嘗試獲取同步狀態( p == head && tryAcquire(arg)) * 緣由是:1.頭結點是成功獲取同步狀態的節點,而頭節點的線程釋放了同步狀態之後,將會喚醒其後繼節點,後繼節點的線程被喚醒後要檢查本身的前驅節點是否爲頭結點。 * 2.維護同步隊列的FIFO原則,節點進入同步隊列以後,就進入了一個自旋的過程,每一個節點(或者說是每一個線程)都在自省的觀察。 * */ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } /** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node's predecessor * * 同步器經過死循環的方式來保證節點的正確添加,在「死循環」 中經過CAS將節點設置成爲尾節點以後,當前線程才能從該方法中返回,不然當前線程不斷的嘗試設置。 * enq方法將併發添加節點的請求經過CAS變得「串行化」了。 * */ private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } /** * Convenience method to park and then check if interrupted * * @return {@code true} if interrupted * * 阻塞當前線程 * */ private final boolean parkAndCheckInterrupt() { // 阻塞當前線程 LockSupport.park(this); // 線程被喚醒以後的中斷狀態 return Thread.interrupted(); } /** * Releases in exclusive mode. Implemented by unblocking one or * more threads if {@link #tryRelease} returns true. * This method can be used to implement method {@link Lock#unlock}. * * @param arg the release argument. This value is conveyed to * {@link #tryRelease} but is otherwise uninterpreted and * can represent anything you like. * @return the value returned from {@link #tryRelease} * * 釋放公平鎖 * */ public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } /** * Attempts to set the state to reflect a release in exclusive * mode. * * <p>This method is always invoked by the thread performing release. * * <p>The default implementation throws * {@link UnsupportedOperationException}. * * @param arg the release argument. This value is always the one * passed to a release method, or the current state value upon * entry to a condition wait. The value is otherwise * uninterpreted and can represent anything you like. * @return {@code true} if this object is now in a fully released * state, so that any waiting threads may attempt to acquire; * and {@code false} otherwise. * @throws IllegalMonitorStateException if releasing would place this * synchronizer in an illegal state. This exception must be * thrown in a consistent fashion for synchronization to work * correctly. * @throws UnsupportedOperationException if exclusive mode is not supported */ protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } /** * Checks and updates status for a node that failed to acquire. * Returns true if thread should block. This is the main signal * control in all acquire loops. Requires that pred == node.prev * * @param pred node's predecessor holding status * @param node the node * @return {@code true} if thread should block * 返回當前線程是否應該阻塞 * * 說明: (01) 關於waitStatus請參考下表(中擴號內爲waitStatus的值),更多關於waitStatus的內容,能夠參考前面的Node類的介紹。 CANCELLED[1] -- 當前線程已被取消 SIGNAL[-1] -- 「當前線程的後繼線程須要被unpark(喚醒)」。通常發生狀況是:當前線程的後繼線程處於阻塞狀態,而當前線程被release或cancel掉,所以須要喚醒當前線程的後繼線程。 CONDITION[-2] -- 當前線程(處在Condition休眠狀態)在等待Condition喚醒 PROPAGATE[-3] -- (共享鎖)其它線程獲取到「共享鎖」 [0] -- 當前線程不屬於上面的任何一種狀態。 (02) shouldParkAfterFailedAcquire()經過如下規則,判斷「當前線程」是否須要被阻塞。 規則1:若是前繼節點狀態爲SIGNAL,代表當前節點須要被unpark(喚醒),此時則返回true。 規則2:若是前繼節點狀態爲CANCELLED(ws>0),說明前繼節點已經被取消,則經過先前回溯找到一個有效(非CANCELLED狀態)的節點,並返回false。 規則3:若是前繼節點狀態爲非SIGNAL、非CANCELLED,則設置前繼的狀態爲SIGNAL,並返回false。 * */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 前驅節點的狀態 int ws = pred.waitStatus; // 若是前驅節點是SIGNAL狀態,則意味着當前線程須要unpark喚醒,此時返回true if (ws == Node.SIGNAL) /* * This node has already set status asking a release to signal it, so it can safely park. */ return true; // 若是前繼節點是取消的狀態,則設置當前節點的「當前前繼節點爲」原節點的前繼節點 if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we need a signal, but don't park yet. Caller will need to retry to make sure * it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } /** * Cancels an ongoing attempt to acquire. * * @param node the node */ private void cancelAcquire(Node node) { // Ignore if node doesn't exist if (node == null) return; node.thread = null; // Skip cancelled predecessors Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // predNext is the apparent node to unsplice. CASes below will // fail if not, in which case, we lost race vs another cancel // or signal, so no further action is necessary. Node predNext = pred.next; // Can use unconditional write instead of CAS here. // After this atomic step, other Nodes can skip past us. // Before, we are free of interference from other threads. node.waitStatus = Node.CANCELLED; // If we are the tail, remove ourselves. if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); } node.next = node; // help GC } } /** * Wakes up node's successor, if one exists. * * @param node the node */ private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try to clear in anticipation of signalling. It is OK if this fails or if * status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally just the next node. But if cancelled or apparently null, traverse * backwards from tail to find the actual non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); } /** * Sets head of queue to be node, thus dequeuing. Called only by * acquire methods. Also nulls out unused fields for sake of GC * and to suppress unnecessary signals and traversals. * * @param node the node */ private void setHead(Node node) { head = node; node.thread = null; node.prev = null; } /** * Atomically sets synchronization state to the given updated * value if the current state value equals the expected value. * This operation has memory semantics of a <tt>volatile</tt> read * and write. * * @param expect the expected value * @param update the new value * @return true if successful. False return indicates that the actual * value was not equal to the expected value. */ protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); } /** * CAS waitStatus field of a node. */ private static final boolean compareAndSetWaitStatus(Node node, int expect, int update) { return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update); } /** * CAS next field of a node. */ private static final boolean compareAndSetNext(Node node, Node expect, Node update) { return unsafe.compareAndSwapObject(node, nextOffset, expect, update); } /** * CAS tail field. Used only by enq. */ private final boolean compareAndSetTail(Node expect, Node update) { return unsafe.compareAndSwapObject(this, tailOffset, expect, update); } /** * CAS head field. Used only by enq. */ private final boolean compareAndSetHead(Node update) { return unsafe.compareAndSwapObject(this, headOffset, null, update); } /** * Setup to support compareAndSet. We need to natively implement * this here: For the sake of permitting future enhancements, we * cannot explicitly subclass AtomicInteger, which would be * efficient and useful otherwise. So, as the lesser of evils, we * natively implement using hotspot intrinsics(編譯器內部函數) API. And while we * are at it, we do the same for other CASable fields (which could * otherwise be done with atomic field updaters). */ private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long stateOffset; private static final long headOffset; private static final long tailOffset; private static final long waitStatusOffset; private static final long nextOffset; static { try { stateOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state")); headOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("head")); tailOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("tail")); waitStatusOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("waitStatus")); nextOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("next")); } catch (Exception ex) { throw new Error(ex); } } }
AbstractOwnableSynchronizer的源碼以下:
package concurrentMy.aqs; /** * * (設置和獲取鎖的持有者線程) * * <p> * 修改歷史: <br> * 修改日期 修改人員 版本 修改內容<br> * -------------------------------------------------<br> * 2016年7月5日 下午3:42:37 user 1.0 初始化建立<br> * </p> * * @author Peng.Li * @version 1.0 * @since JDK1.7 */ public abstract class AbstractOwnableSynchronizerTest implements java.io.Serializable { /** Use serial ID even though all fields transient. */ private static final long serialVersionUID = 3737899427754241961L; /** * Empty constructor for use by subclasses. */ protected AbstractOwnableSynchronizerTest() { } /** * The current owner of exclusive mode synchronization. * * 加 transient 表示exclusiveOwnerThread不能被串行化,不會被做爲序列化的一部分 * * 鎖的持有線程 */ private transient Thread exclusiveOwnerThread; /** * Sets the thread that currently owns exclusive access. A * <tt>null</tt> argument indicates that no thread owns access. * This method does not otherwise impose any synchronization or * <tt>volatile</tt> field accesses. * * protected final來修飾,表示子類可使用這個方法,可是不能重載這個方法,也就是不能修改這個方法 */ protected final void setExclusiveOwnerThread(Thread t) { exclusiveOwnerThread = t; } /** * Returns the thread last set by * <tt>setExclusiveOwnerThread</tt>, or <tt>null</tt> if never * set. This method does not otherwise impose any synchronization * or <tt>volatile</tt> field accesses. * @return the owner thread */ protected final Thread getExclusiveOwnerThread() { return exclusiveOwnerThread; } }
參考文章:
1.Doug Lea的論文: http://gee.cs.oswego.edu/dl/papers/aqs.pdf
2. 深度解析Java 8:JDK1.8 AbstractQueuedSynchronizer的實現分析(上): http://www.infoq.com/cn/articles/jdk1.8-abstractqueuedsynchronizer
3. 深度解析Java 8:AbstractQueuedSynchronizer的實現分析(下): http://www.infoq.com/cn/articles/java8-abstractqueuedsynchronizer
4. 深刻淺出 Java Concurrency (7): 鎖機制 part 2 AQS: http://www.blogjava.net/xylz/archive/2010/07/06/325390.html
5 AQS:http://www.cnblogs.com/leesf456/p/5350186.html
6.參考:https://tech.meituan.com/Java_Lock.html