本文經過ReentrantLock來窺探AbstractQueuedSynchronizer(AQS)的實現原理,在看此文以前。你須要瞭解一下park、unpark的功能,請移步至上一篇《深刻剖析park、unpark》;html
根據AbstractQueuedSynchronizer的官方文檔,若是想實現一把鎖的,須要繼承AbstractQueuedSynchronizer,並須要重寫tryAcquire、tryRelease、可選擇重寫isHeldExclusively提供locked state、由於支持序列化,因此須要重寫readObject以便反序列化時恢復原始值、newCondition提供條件;官方提供的java代碼以下(官方文檔見參考鏈接);java
public class MyLock implements Lock, java.io.Serializable { private static class Sync extends AbstractQueuedSynchronizer { // Acquires the lock if state is zero @Override public boolean tryAcquire(int acquires) { assert acquires == 1; // Otherwise unused if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } // Releases the lock by setting state to zero @Override protected boolean tryRelease(int releases) { assert releases == 1; // Otherwise unused if (getState() == 0) { throw new IllegalMonitorStateException(); } setExclusiveOwnerThread(null); setState(0); return true; } // Provides a Condition Condition newCondition() { return new ConditionObject(); } // Deserializes properly private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException { s.defaultReadObject(); setState(0); // reset to unlocked state } // Reports whether in locked state @Override protected boolean isHeldExclusively() { return getState() == 1; } } /** * The sync object does all the hard work. We just forward to it. */ private final Sync sync = new Sync(); @Override public void lock() { sync.acquire(1); } @Override public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } @Override public boolean tryLock() { return sync.tryAcquire(1); } @Override public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } @Override public void unlock() { sync.release(1); } @Override public Condition newCondition() { return sync.newCondition(); } private static volatile Integer value = 0; public static void main(String[] args) { MyLock myLock = new MyLock(); for (int i = 0; i < 1000; i++) { new Thread(()->{ myLock.lock(); value ++; myLock.unlock(); }).start(); } System.out.println(value); } }
上面是一個不可重入的鎖,它實現了一個鎖基礎功能,目的是爲了跟ReentrantLock的實現作對比;node
ReentrantLock意思爲可重入鎖,指的是一個線程可以對一個臨界資源重複加鎖。ReentrantLock跟經常使用的Synchronized進行比較;c#
Synchronized的分析能夠參考《深刻剖析synchronized關鍵詞》,ReentrantLock能夠建立公平鎖、也能夠建立非公平鎖,接下來看一下ReentrantLock的簡單用法,非公平鎖實現比較簡單,今天重點是公平鎖;api
public class ReentrantLockTest { public static void main(String[] args) { ReentrantLock reentrantLock = new ReentrantLock(true); reentrantLock.lock(); try { log.info("lock"); } catch (Exception e) { log.error(e); } finally { reentrantLock.unlock(); log.info("unlock"); } } }
先看一下加鎖方法lock安全
非公平鎖lock方法數據結構
compareAndSetState很好理解,經過CAS加鎖,若是加鎖失敗調用acquire;架構
/** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
final void lock() { acquire(1); }
線程繼續等待,仍然保留獲取鎖的可能,獲取鎖流程仍在繼續,分析實現原理oracle
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
總結:公平鎖的上鎖是必須判斷本身是否是須要排隊;而非公平鎖是直接進行CAS修改計數器看能不能加鎖成功;若是加鎖不成功則乖乖排隊(調用acquire);因此無論公平仍是不公平;只要進到了AQS隊列當中那麼他就會排隊;app
美團畫的AQS的架構圖,很詳細,當有自定義同步器接入時,只需重寫第一層所須要的部分方法便可,不須要關注底層具體的實現流程。當自定義同步器進行加鎖或者解鎖操做時,先通過第一層的API進入AQS內部方法,而後通過第二層進行鎖的獲取,接着對於獲取鎖失敗的流程,進入第三層和第四層的等待隊列處理,而這些處理方式均依賴於第五層的基礎數據提供層。
AQS核心思想是,若是被請求的共享資源空閒,那麼就將當前請求資源的線程設置爲有效的工做線程,將共享資源設置爲鎖定狀態;若是共享資源被佔用,就須要必定的阻塞等待喚醒機制來保證鎖分配。這個機制主要用的是CLH隊列的變體實現的,將暫時獲取不到鎖的線程加入到隊列中。
CLH:Craig、Landin and Hagersten隊列,是單向鏈表,AQS中的隊列是CLH變體的虛擬雙向隊列(FIFO),AQS是經過將每條請求共享資源的線程封裝成一個節點來實現鎖的分配。
加鎖:
解鎖:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){ selfInterrupt(); } }
acquire方法首先會調tryAcquire方法,須要注意的是tryAcquire的結果作取反;根據前面分析,tryAcquire會調用子類的實現,ReentrantLock有兩個內部類,FairSync,NonfairSync,都繼承自Sync,Sync繼承AbstractQueuedSynchronizer;
實現方式差異在是否有hasQueuedPredecessors() 的判斷條件
/** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); // 獲取lock對象的上鎖狀態,若是鎖是自由狀態則=0,若是被上鎖則爲1,大於1表示重入 int c = getState(); if (c == 0) { // hasQueuedPredecessors,判斷本身是否須要排隊 // 下面我會單獨介紹,若是不須要排隊則進行cas嘗試加鎖 // 若是加鎖成功則把當前線程設置爲擁有鎖的線程 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 若是C不等於0,可是當前線程等於擁有鎖的線程則表示這是一次重入,那麼直接把狀態+1表示重入次數+1 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
非公平鎖
/** * Performs non-fair tryLock. tryAcquire is implemented in * subclasses, but both need nonfair try for trylock method. */ final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
先來看下AQS中最基本的數據結構——Node,Node即爲上面CLH變體隊列中的節點。
static final class Node { static final Node SHARED = new Node(); // 表示線程以共享的模式等待鎖 static final Node EXCLUSIVE = null; // 表示線程正在以獨佔的方式等待鎖 static final int CANCELLED = 1; // 表示線程獲取鎖的請求已經取消了 static final int SIGNAL = -1; // 表示線程已經準備好了,就等資源釋放了 static final int CONDITION = -2; // 表示節點在等待隊列中,節點線程等待喚醒 static final int PROPAGATE = -3; // 當前線程處在SHARED狀況下,該字段纔會使用 volatile int waitStatus; // 當前節點在隊列中的狀態 volatile Node prev; // 前驅指針 volatile Node next; // 後繼指針 volatile Thread thread; // 表示處於該節點的線程 Node nextWaiter; // 指向下一個處於CONDITION狀態的節點 final boolean isShared() { return nextWaiter == SHARED; } // 返回前驅節點,沒有的話拋出npe final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
再看hasQueuedPredecessors,整個方法若是最後返回false,則去加鎖,若是返回true則不加鎖,由於這個方法被取反操做;hasQueuedPredecessors是公平鎖加鎖時判斷等待隊列中是否存在有效節點的方法。若是返回False,說明當前線程能夠爭取共享資源;若是返回True,說明隊列中存在有效節點,當前線程必須加入到等待隊列中。
雙向鏈表中,第一個節點爲虛節點,其實並不存儲任何信息,只是佔位。真正的第一個有數據的節點,是在第二個節點開始的。
若是這上面沒有看懂,沒有關係,先來分析一下構建整個隊列的過程;
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure // tail爲對尾,賦值給pred Node pred = tail; // 判斷pred是否爲空,其實就是判斷對尾是否有節點,其實只要隊列被初始化了對尾確定不爲空 if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
用一張圖來分析一下,整個隊列構建過程;
(1)經過Node(Thread thread, Node mode) 方法構建一個node節點(node2),此時的nextWaiter爲空,線程不爲空,是當前線程;
(2)若是隊尾爲空,則說明隊列未創建,調用enq構建第一個虛擬節點(node1),經過compareAndSetHead方法構建一個頭節點,須要注意的是該頭節點thread是null,後續不少都是用線程是否爲null來判讀是否爲第一個虛擬節點;
(3)將node1 cas設置爲head
(4)將頭節點賦值爲tail = head
(5)進入下一次for循環時,會走到else分支,會將傳入的node的指向頭部節點的next,此時node2的prev指向node1(tail)
(6)將node2 cas設置爲tail;
(7)將node2指向node1的next;
通過上面的步驟,就構建了一個長度爲2的隊列;
添加第二個隊列時,走的是這段代碼,流程就簡單多了,代碼以下
if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } }
再看一下h != t && ((s = h.next) == null || s.thread != Thread.currentThread());由於整個構建過程並非原子操做,因此這個條件判斷,如今再是否是就看明白了?
addWaiter方法其實就是把對應的線程以Node的數據結構形式加入到雙端隊列裏,返回的是一個包含該線程的Node。而這個Node會做爲參數,進入到acquireQueued方法中。acquireQueued方法能夠對排隊中的線程進行「獲鎖」操做。總的來講,一個線程獲取鎖失敗了,被放入等待隊列,acquireQueued會把放入隊列中的線程不斷去獲取鎖,直到獲取成功或者再也不須要獲取(中斷)。
下面經過代碼從「什麼時候出隊列?」和「如何出隊列?」兩個方向來分析一下acquireQueued源碼:
final boolean acquireQueued(final Node node, int arg) { // 標記是否成功拿到資源 boolean failed = true; try { // 標記等待過程當中是否中斷過 boolean interrupted = false; for (;;) { // 獲取當前節點的前驅節點,有兩種狀況;一、上一個節點爲頭部;2上一個節點不爲頭部 final Node p = node.predecessor(); // 若是p是頭結點,說明當前節點在真實數據隊列的首部,就嘗試獲取鎖(頭結點是虛節點) // 由於第一次tryAcquire判斷是否須要排隊,若是須要排隊,那麼我就入隊,此處再重試一次 if (p == head && tryAcquire(arg)) { // 獲取鎖成功,頭指針移動到當前node setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 說明p爲頭節點且當前沒有獲取到鎖(多是非公平鎖被搶佔了)或者是p不爲頭結點,這個時候就要判斷當前node是否要被阻塞(被阻塞條件:前驅節點的waitStatus爲-1),防止無限循環浪費資源。具體兩個方法下面細細分析 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { // 成功拿到資源,準備釋放 if (failed) cancelAcquire(node); } }
設置當前節點爲頭節點,而且將node.thread爲空(剛纔提到判斷是否爲頭部虛擬節點的條件就是node.thread == null。waitStatus狀態併爲修改,等下咱們再分析;
private void setHead(Node node) { head = node; node.thread = null; node.prev = null; }
接下來看shouldParkAfterFailedAcquire代碼,須要注意的是,每個新建立Node的節點是被下一個排隊的node設置爲等待狀態爲SIGNAL, 這裏比較難以理解爲何須要去改變上一個節點的park狀態?
每一個node都有一個狀態,默認爲0,表示無狀態,-1表示在park;當時不能本身把本身改爲-1狀態?由於你得肯定你本身park了纔是能改成-1;因此只能先park;在改狀態;可是問題你本身都park了;徹底釋放CPU資源了,故而沒有辦法執行任何代碼了,因此只能別人來改;故而能夠看到每次都是本身的後一個節點把本身改爲-1狀態;
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 獲取前驅節點的狀態 int ws = pred.waitStatus; // 說明頭結點處於喚醒狀態 if (ws == Node.SIGNAL) return true; // static final int CANCELLED = 1; // 表示線程獲取鎖的請求已經取消了 // static final int SIGNAL = -1; // 表示線程已經準備好了,就等資源釋放了 // static final int CONDITION = -2; // 表示節點在等待隊列中,節點線程等待喚醒 // static final int PROPAGATE = -3; // 當前線程處在SHARED狀況下,該字段纔會使用 if (ws > 0) { do { // 把取消節點從隊列中剔除 node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 設置前任節點等待狀態爲SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
調用LockSupport.park掛起當前線程,本身已經park,沒法再修改狀態了!
private final boolean parkAndCheckInterrupt() { // 調⽤用park()使線程進⼊入waiting狀態 LockSupport.park(this); // 若是被喚醒,查看⾃自⼰己是不不是被中斷的,這⾥裏裏先清除⼀下標記位 return Thread.interrupted(); }
shouldParkAfterFailedAcquire的整個流程仍是比較清晰的,若是不清楚,能夠參考美團畫的流程圖;
經過上面的分析,當failed爲true時,也就意味着park結束,線程被喚醒了,for循環已經跳出,開始執行cancelAcquire,經過cancelAcquire方法,將Node的狀態標記爲CANCELLED;代碼以下:
private void cancelAcquire(Node node) { // 將無效節點過濾 if (node == null) return; // 設置該節點不關聯任何線程,也就是虛節點(上面已經提到,node.thread = null是判讀是不是頭節點的條件) node.thread = null; Node pred = node.prev; // 經過前驅節點,處理waitStatus > 0的node while (pred.waitStatus > 0) node.prev = pred = pred.prev; // 把當前node的狀態設置爲CANCELLED,當下一個node排隊結束時,本身就會被上一行代碼處理掉; Node predNext = pred.next; node.waitStatus = Node.CANCELLED; // 若是當前節點是尾節點,將從後往前的第一個非取消狀態的節點設置爲尾節點,更新失敗的話,則進入else,若是更新成功,將tail的後繼節點設置爲null if (node == tail && compareAndSetTail(node, pred)) { // 把本身設置爲null compareAndSetNext(pred, predNext, null); } else { int ws; // 若是當前節點不是head的後繼節點 // 1:判斷當前節點前驅節點的是否爲SIGNAL // 2:若是不是,則把前驅節點設置爲SINGAL看是否成功 // 若是1和2中有一個爲true,再判斷當前節點的線程是否爲null // 若是上述條件都知足,把當前節點的前驅節點的後繼指針指向當前節點的後繼節點 if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { // 若是當前節點是head的後繼節點,或者上述條件不知足,那就喚醒當前節點的後繼節點 unparkSuccessor(node); } node.next = node; // help GC } }
當前的流程:
獲取當前節點的前驅節點,若是前驅節點的狀態是CANCELLED,那就一直往前遍歷,找到第一個waitStatus <= 0的節點,將找到的Pred節點和當前Node關聯,將當前Node設置爲CANCELLED。
根據當前節點的位置,考慮如下三種狀況:
(1) 當前節點是尾節點。
(2) 當前節點是Head的後繼節點。
(3) 當前節點不是Head的後繼節點,也不是尾節點。
(1)當前節點時尾節點
(2)當前節點是Head的後繼節點。
這張圖描述的是這段代碼:unparkSuccessor
Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; }
(3)當前節點不是Head的後繼節點,也不是尾節點。
這張圖描述的是這段代碼跟(2)同樣;
經過上面的圖,你會發現全部的變化都是對Next指針進行了操做,而沒有對Prev指針進行操做,緣由是執行cancelAcquire的時候,當前節點的前置節點可能已經從隊列中出去了(已經執行過Try代碼塊中的shouldParkAfterFailedAcquire方法了),也就是下圖中代碼1和代碼2直接的間隙就會出現這種狀況,此時修改Prev指針,有可能會致使Prev指向另外一個已經移除隊列的Node,所以這塊變化Prev指針不安全。
解鎖時並不區分公平和不公平,由於ReentrantLock實現了鎖的可重入,能夠進一步的看一下時如何處理的,上代碼:
public void unlock() { sync.release(1); }
public final boolean release(int arg) { // 自定義的tryRelease若是返回true,說明該鎖沒有被任何線程持有 if (tryRelease(arg)) { // 獲取頭結點 Node h = head; if (h != null && h.waitStatus != 0) // 頭結點不爲空而且頭結點的waitStatus不是初始化節點狀況,解除線程掛起狀態 unparkSuccessor(h); return true; } return false; }
這裏的判斷條件爲何是h != null && h.waitStatus != 0
protected final boolean tryRelease(int releases) { // 減小可重入次數,setState(c); int c = getState() - releases; // 當前線程不是持有鎖的線程,拋出異常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; // 若是持有線程所有釋放,將當前獨佔鎖全部線程設置爲null,並更新state if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
這個方法在cancelAcquire其實也用到了,簡單分析一下
// 若是當前節點是head的後繼節點,或者上述條件不知足,就喚醒當前節點的後繼節點unparkSuccessor(node);
private void unparkSuccessor(Node node) { // 獲取結點waitStatus,CAS設置狀態state=0 int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // 獲取當前節點的下一個節點 Node s = node.next; // 若是下個節點是null或者下個節點被cancelled,就找到隊列最開始的非cancelled的節點 if (s == null || s.waitStatus > 0) { s = null; // 就從尾部節點開始找,到隊首,找到隊列第一個waitStatus<0的節點。 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } // 若是當前節點的下個節點不爲空,並且狀態<=0,就把當前節點unpark if (s != null) LockSupport.unpark(s.thread); }
爲何要從後往前找第一個非Cancelled的節點呢?
緣由1:addWaiter方法並不是原子,構建鏈表結構時以下圖中 一、2間隙執行unparkSuccessor,此時鏈表是不完整的,沒辦法從前日後找了;
緣由2:還有一點緣由,在產生CANCELLED狀態節點的時候,先斷開的是Next指針,Prev指針並未斷開,所以也是必需要從後往前遍歷纔可以遍歷徹底部的Node;
喚醒後,會執行return Thread.interrupted();,這個函數返回的是當前執行線程的中斷狀態,並清除。
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
acquireQueued代碼,當parkAndCheckInterrupt返回True或者False的時候,interrupted的值不一樣,但都會執行下次循環。若是這個時候獲取鎖成功,就會把當前interrupted返回。
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
若是acquireQueued爲True,就會執行selfInterrupt方法。
該方法實際上是爲了中斷線程。但爲何獲取了鎖之後還要中斷線程呢?這部分屬於Java提供的協做式中斷知識內容,感興趣同窗能夠查閱一下。這裏簡單介紹一下:
這裏的處理方式主要是運用線程池中基本運做單元Worder中的runWorker,經過Thread.interrupted()進行額外的判斷處理,能夠看下ThreadPoolExecutor源碼的判斷條件;
AQS在JUC中有⽐比較⼴普遍的使⽤用,如下是主要使⽤用的地⽅方:
至此,經過ReentrantLock分析AQS的實現原理一家完畢,須要說明的是,此文深度參考了美團分析的ReentrantLock,是參考連接的第三個,有興趣能夠對比差別,感謝!