一行一行源碼分析清楚AbstractQueuedSynchronizerjava
轉自https://www.javadoop.com/post...node
在分析 Java 併發包 java.util.concurrent 源碼的時候,少不了須要瞭解 AbstractQueuedSynchronizer(如下簡寫AQS)這個抽象類,由於它是 Java 併發包的基礎工具類,是實現 ReentrantLock、CountDownLatch、Semaphore、FutureTask 等類的基礎。程序員
Google 一下 AbstractQueuedSynchronizer,咱們能夠找到不少關於 AQS 的介紹,可是不少都沒有介紹清楚,由於大部分文章沒有把其中的一些關鍵的細節說清楚。web
本文將從 ReentrantLock 的公平鎖源碼出發,分析下 AbstractQueuedSynchronizer 這個類是怎麼工做的,但願能給你們提供一些簡單的幫助。面試
申明如下幾點:spring
廢話結束,開始。數據庫
此篇博客全部源碼均來自JDK 1.8
AQS內部維護着一個FIFO隊列,該隊列就是CLH同步隊列。編程
CLH同步隊列是一個FIFO雙向隊列,AQS依賴它來完成同步狀態的管理,當前線程若是獲取同步狀態失敗時,AQS則會將當前線程已經等待狀態等信息構形成一個節點(Node)並將其加入到CLH同步隊列,同時會阻塞當前線程,當同步狀態釋放時,會把首節點喚醒(公平鎖),使其再次嘗試獲取同步狀態。安全
在CLH同步隊列中,一個節點表示一個線程,它保存着線程的引用(thread)、狀態(waitStatus)、前驅節點(prev)、後繼節點(next),其定義以下:微信
static final class Node { /** 共享 */ static final Node SHARED = new Node(); /** 獨佔 */ static final Node EXCLUSIVE = null; /** * 由於超時或者中斷,節點會被設置爲取消狀態,被取消的節點時不會參與到競爭中的,他會一直保持取消狀態不會轉變爲其餘狀態; */ static final int CANCELLED = 1; /** * 後繼節點的線程處於等待狀態,而當前節點的線程若是釋放了同步狀態或者被取消,將會通知後繼節點,使後繼節點的線程得以運行 */ static final int SIGNAL = -1; /** * 節點在等待隊列中,節點線程等待在Condition上,當其餘線程對Condition調用了signal()後,改節點將會從等待隊列中轉移到同步隊列中,加入到同步狀態的獲取中 */ static final int CONDITION = -2; /** * 表示下一次共享式同步狀態獲取將會無條件地傳播下去 */ static final int PROPAGATE = -3; /** 等待狀態 */ volatile int waitStatus; /** 前驅節點 */ volatile Node prev; /** 後繼節點 */ volatile Node next; /** 獲取同步狀態的線程 */ volatile Thread thread; Node nextWaiter; final boolean isShared() { return nextWaiter == SHARED; } final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { } Node(Thread thread, Node mode) { this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { this.waitStatus = waitStatus; this.thread = thread; } }
CLH同步隊列結構圖以下:
學了數據結構的咱們,CLH隊列入列是再簡單不過了,無非就是tail指向新節點、新節點的prev指向當前最後的節點,當前最後一個節點的next指向當前節點。代碼咱們能夠看看addWaiter(Node node)方法:
private Node addWaiter(Node mode) { //新建Node Node node = new Node(Thread.currentThread(), mode); //快速嘗試添加尾節點 Node pred = tail; if (pred != null) { node.prev = pred; //CAS設置尾節點 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //屢次嘗試 enq(node); return node; }
addWaiter(Node node)先經過快速嘗試設置尾節點,若是失敗,則調用enq(Node node)方法設置尾節點
private Node enq(final Node node) { //屢次嘗試,直到成功爲止 for (;;) { Node t = tail; //tail不存在,設置爲首節點 if (t == null) { if (compareAndSetHead(new Node())) tail = head; } else { //設置爲尾節點 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
在上面代碼中,兩個方法都是經過一個CAS方法compareAndSetTail(Node expect, Node update)來設置尾節點,該方法能夠確保節點是線程安全添加的。在enq(Node node)方法中,AQS經過「死循環」的方式來保證節點能夠正確添加,只有成功添加後,當前線程纔會從該方法返回,不然會一直執行下去。
過程圖以下:
CLH同步隊列遵循FIFO,首節點的線程釋放同步狀態後,將會喚醒它的後繼節點(next),然後繼節點將會在獲取同步狀態成功時將本身設置爲首節點,這個過程很是簡單,head執行該節點並斷開原首節點的next和當前節點的prev便可,注意在這個過程是不須要使用CAS來保證的,由於只有一個線程可以成功獲取到同步狀態。過程圖以下:
先來看看 AQS 有哪些屬性,搞清楚這些基本就知道 AQS 是什麼套路了,畢竟能夠猜嘛!
// 頭結點,你直接把它當作 當前持有鎖的線程 多是最好理解的 private transient volatile Node head; // 阻塞的尾節點,每一個新的節點進來,都插入到最後,也就造成了一個隱視的鏈表 private transient volatile Node tail; // 這個是最重要的,不過也是最簡單的,表明當前鎖的狀態,0表明沒有被佔用,大於0表明有線程持有當前鎖 // 之因此說大於0,而不是等於1,是由於鎖能夠重入嘛,每次重入都加上1 private volatile int state; // 表明當前持有獨佔鎖的線程,舉個最重要的使用例子,由於鎖能夠重入 // reentrantLock.lock()能夠嵌套調用屢次,因此每次用這個來判斷當前線程是否已經擁有了鎖 // if (currentThread == getExclusiveOwnerThread()) {state++} private transient Thread exclusiveOwnerThread; //繼承自AbstractOwnableSynchronizer
怎麼樣,看樣子應該是很簡單的吧,畢竟也就四個屬性啊。
AbstractQueuedSynchronizer 的等待隊列示意以下所示,注意了,以後分析過程當中所說的 queue,也就是阻塞隊列不包含 head,不包含 head,不包含 head。
轉存失敗從新上傳取消
等待隊列中每一個線程被包裝成一個 node,數據結構是鏈表,一塊兒看看源碼吧:
static final class Node { /** Marker to indicate a node is waiting in shared mode */ // 標識節點當前在共享模式下 static final Node SHARED = new Node(); /** Marker to indicate a node is waiting in exclusive mode */ // 標識節點當前在獨佔模式下 static final Node EXCLUSIVE = null; // ======== 下面的幾個int常量是給waitStatus用的 =========== /** waitStatus value to indicate thread has cancelled */ // 代碼此線程取消了爭搶這個鎖 static final int CANCELLED = 1; /** waitStatus value to indicate successor's thread needs unparking */ // 官方的描述是,其表示當前node的後繼節點對應的線程須要被喚醒 static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ // 本文不分析condition,因此略過吧,下一篇文章會介紹這個 static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ // 一樣的不分析,略過吧 static final int PROPAGATE = -3; // ===================================================== // 取值爲上面的一、-一、-二、-3,或者0(之後會講到) // 這麼理解,暫時只須要知道若是這個值 大於0 表明此線程取消了等待, // 也許就是說半天搶不到鎖,不搶了,ReentrantLock是能夠指定timeouot的。。。 volatile int waitStatus; // 前驅節點的引用 volatile Node prev; // 後繼節點的引用 volatile Node next; // 這個就是線程本尊 volatile Thread thread; }
Node 的數據結構其實也挺簡單的,就是 thread + waitStatus + pre + next 四個屬性而已,你們先要有這個概念在內心。
上面的是基礎知識,後面會屢次用到,內心要時刻記着它們,內心想着這個結構圖就能夠了。下面,咱們開始說 ReentrantLock 的公平鎖。多嘴一下,我說的阻塞隊列不包含 head 節點。
轉存失敗從新上傳取消
首先,咱們先看下 ReentrantLock 的使用方式。
// 我用個web開發中的service概念吧 public class OrderService { // 使用static,這樣每一個線程拿到的是同一把鎖,固然,spring mvc中service默認就是單例,別糾結這個 private static ReentrantLock reentrantLock = new ReentrantLock(true); public void createOrder() { // 好比咱們同一時間,只容許一個線程建立訂單 reentrantLock.lock(); // 一般,lock 以後緊跟着 try 語句 try { // 這塊代碼同一時間只能有一個線程進來(獲取到鎖的線程), // 其餘的線程在lock()方法上阻塞,等待獲取到鎖,再進來 // 執行代碼... // 執行代碼... // 執行代碼... } finally { // 釋放鎖 reentrantLock.unlock(); } } }
ReentrantLock 在內部用了內部類 Sync 來管理鎖,因此真正的獲取鎖和釋放鎖是由 Sync 的實現類來控制的。
abstract static class Sync extends AbstractQueuedSynchronizer { }
Sync 有兩個實現,分別爲 NonfairSync(非公平鎖)和 FairSync(公平鎖),咱們看 FairSync 部分。
public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
不少人確定開始嫌棄上面廢話太多了,下面跟着代碼走,我就不廢話了。
static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; // 爭鎖 final void lock() { acquire(1); } // 來自父類AQS,我直接貼過來這邊,下面分析的時候一樣會這樣作,不會給讀者帶來閱讀壓力 // 咱們看到,這個方法,若是tryAcquire(arg) 返回true, 也就結束了。 // 不然,acquireQueued方法會將線程壓到隊列中 public final void acquire(int arg) { // 此時 arg == 1 // 首先調用tryAcquire(1)一下,名字上就知道,這個只是試一試 // 由於有可能直接就成功了呢,也就不須要進隊列排隊了, // 對於公平鎖的語義就是:原本就沒人持有鎖,根本不必進隊列等待(又是掛起,又是等待被喚醒的) if (!tryAcquire(arg) && // tryAcquire(arg)沒有成功,這個時候須要把當前線程掛起,放到阻塞隊列中。 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) { selfInterrupt(); } } /** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. */ // 嘗試直接獲取鎖,返回值是boolean,表明是否獲取到鎖 // 返回true:1.沒有線程在等待鎖;2.重入鎖,線程原本就持有鎖,也就能夠理所固然能夠直接獲取 protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); // state == 0 此時此刻沒有線程持有鎖 if (c == 0) { // 雖然此時此刻鎖是能夠用的,可是這是公平鎖,既然是公平,就得講究先來後到, // 看看有沒有別人在隊列中等了半天了 if (!hasQueuedPredecessors() && // 若是沒有線程在等待,那就用CAS嘗試一下,成功了就獲取到鎖了, // 不成功的話,只能說明一個問題,就在剛剛幾乎同一時刻有個線程搶先了 =_= // 由於剛剛還沒人的,我判斷過了
更多內容請關注微信公衆號【Java技術江湖】
一位阿里 Java 工程師的技術小站。做者黃小斜,專一 Java 相關技術:SSM、SpringBoot、MySQL、分佈式、中間件、集羣、Linux、網絡、多線程,偶爾講點Docker、ELK,同時也分享技術乾貨和學習經驗,致力於Java全棧開發!(關注公衆號後回覆」Java「便可領取 Java基礎、進階、項目和架構師等免費學習資料,更有數據庫、分佈式、微服務等熱門技術學習視頻,內容豐富,兼顧原理和實踐,另外也將贈送做者原創的Java學習指南、Java程序員面試指南等乾貨資源)