Java併發指南7:JUC的核心類AQS詳解

一行一行源碼分析清楚AbstractQueuedSynchronizerjava

轉自https://www.javadoop.com/post...node

在分析 Java 併發包 java.util.concurrent 源碼的時候,少不了須要瞭解 AbstractQueuedSynchronizer(如下簡寫AQS)這個抽象類,由於它是 Java 併發包的基礎工具類,是實現 ReentrantLock、CountDownLatch、Semaphore、FutureTask 等類的基礎。程序員

Google 一下 AbstractQueuedSynchronizer,咱們能夠找到不少關於 AQS 的介紹,可是不少都沒有介紹清楚,由於大部分文章沒有把其中的一些關鍵的細節說清楚。web

本文將從 ReentrantLock 的公平鎖源碼出發,分析下 AbstractQueuedSynchronizer 這個類是怎麼工做的,但願能給你們提供一些簡單的幫助。面試

申明如下幾點:spring

  1. 本文有點長,可是很簡單很簡單很簡單,主要面向讀者對象爲併發編程的初學者,或者想要閱讀java併發包源碼的開發者。
  2. 建議在電腦上閱讀,若是你想好好地理解全部的細節,並且你歷來沒看過相關的分析,你可能至少須要 20 分鐘仔細看全部的描述,本文後面的 1/3 以上很簡單,前面的 1/4 更簡單,中間的部分要好好看。
  3. 若是你不知道爲何要看這個,我想告訴你,即便你看懂了全部的細節,你可能也不能把你的業務代碼寫得更好
  4. 源碼環境 JDK1.7,看到不懂或有疑惑的部分,最好能本身打開源碼看看。Doug Lea 大神的代碼寫得真心不錯。
  5. 有不少英文註釋我沒有刪除,這樣讀者能夠參考着英文說的來,萬一被我忽悠了呢
  6. 本文不分析共享模式,這樣能夠給讀者減小不少負擔,只要把獨佔模式看懂,共享模式讀者應該就能夠順着代碼看懂了。並且也不分析 condition 部分,因此應該說很容易就能夠看懂了。
  7. 本文大量使用咱們平時用得最多的 ReentrantLock 的概念,本質上來講是不正確的,讀者應該清楚,AQS 不只僅用來實現鎖,只是但願讀者能夠用鎖來聯想 AQS 的使用場景,下降讀者的閱讀壓力
  8. ReentrantLock 的公平鎖和非公平鎖只有一點點區別,沒有任何閱讀壓力
  9. 你須要提早知道什麼是 CAS(CompareAndSet)

廢話結束,開始。數據庫

CLH隊列

此篇博客全部源碼均來自JDK 1.8

AQS內部維護着一個FIFO隊列,該隊列就是CLH同步隊列。編程

CLH同步隊列是一個FIFO雙向隊列,AQS依賴它來完成同步狀態的管理,當前線程若是獲取同步狀態失敗時,AQS則會將當前線程已經等待狀態等信息構形成一個節點(Node)並將其加入到CLH同步隊列,同時會阻塞當前線程,當同步狀態釋放時,會把首節點喚醒(公平鎖),使其再次嘗試獲取同步狀態。安全

在CLH同步隊列中,一個節點表示一個線程,它保存着線程的引用(thread)、狀態(waitStatus)、前驅節點(prev)、後繼節點(next),其定義以下:微信

static final class Node {
    /** 共享 */
    static final Node SHARED = new Node();

    /** 獨佔 */
    static final Node EXCLUSIVE = null;

    /**
     * 由於超時或者中斷,節點會被設置爲取消狀態,被取消的節點時不會參與到競爭中的,他會一直保持取消狀態不會轉變爲其餘狀態;
     */
    static final int CANCELLED =  1;

    /**
     * 後繼節點的線程處於等待狀態,而當前節點的線程若是釋放了同步狀態或者被取消,將會通知後繼節點,使後繼節點的線程得以運行
     */
    static final int SIGNAL    = -1;

    /**
     * 節點在等待隊列中,節點線程等待在Condition上,當其餘線程對Condition調用了signal()後,改節點將會從等待隊列中轉移到同步隊列中,加入到同步狀態的獲取中
     */
    static final int CONDITION = -2;

    /**
     * 表示下一次共享式同步狀態獲取將會無條件地傳播下去
     */
    static final int PROPAGATE = -3;

    /** 等待狀態 */
    volatile int waitStatus;

    /** 前驅節點 */
    volatile Node prev;

    /** 後繼節點 */
    volatile Node next;

    /** 獲取同步狀態的線程 */
    volatile Thread thread;

    Node nextWaiter;

    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {
    }

    Node(Thread thread, Node mode) {
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) {
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

CLH同步隊列結構圖以下:

轉存失敗從新上傳取消201701240001

入列

學了數據結構的咱們,CLH隊列入列是再簡單不過了,無非就是tail指向新節點、新節點的prev指向當前最後的節點,當前最後一個節點的next指向當前節點。代碼咱們能夠看看addWaiter(Node node)方法:

private Node addWaiter(Node mode) {
        //新建Node
        Node node = new Node(Thread.currentThread(), mode);
        //快速嘗試添加尾節點
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            //CAS設置尾節點
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //屢次嘗試
        enq(node);
        return node;
    }

addWaiter(Node node)先經過快速嘗試設置尾節點,若是失敗,則調用enq(Node node)方法設置尾節點

private Node enq(final Node node) {
        //屢次嘗試,直到成功爲止
        for (;;) {
            Node t = tail;
            //tail不存在,設置爲首節點
            if (t == null) {
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                //設置爲尾節點
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

在上面代碼中,兩個方法都是經過一個CAS方法compareAndSetTail(Node expect, Node update)來設置尾節點,該方法能夠確保節點是線程安全添加的。在enq(Node node)方法中,AQS經過「死循環」的方式來保證節點能夠正確添加,只有成功添加後,當前線程纔會從該方法返回,不然會一直執行下去。

過程圖以下:

轉存失敗從新上傳取消1485225206860201701240002

出列

CLH同步隊列遵循FIFO,首節點的線程釋放同步狀態後,將會喚醒它的後繼節點(next),然後繼節點將會在獲取同步狀態成功時將本身設置爲首節點,這個過程很是簡單,head執行該節點並斷開原首節點的next和當前節點的prev便可,注意在這個過程是不須要使用CAS來保證的,由於只有一個線程可以成功獲取到同步狀態。過程圖以下:

轉存失敗從新上傳取消201701240003

AQS 結構

先來看看 AQS 有哪些屬性,搞清楚這些基本就知道 AQS 是什麼套路了,畢竟能夠猜嘛!

// 頭結點,你直接把它當作 當前持有鎖的線程 多是最好理解的
private transient volatile Node head;
// 阻塞的尾節點,每一個新的節點進來,都插入到最後,也就造成了一個隱視的鏈表
private transient volatile Node tail;
// 這個是最重要的,不過也是最簡單的,表明當前鎖的狀態,0表明沒有被佔用,大於0表明有線程持有當前鎖
// 之因此說大於0,而不是等於1,是由於鎖能夠重入嘛,每次重入都加上1
private volatile int state;
// 表明當前持有獨佔鎖的線程,舉個最重要的使用例子,由於鎖能夠重入
// reentrantLock.lock()能夠嵌套調用屢次,因此每次用這個來判斷當前線程是否已經擁有了鎖
// if (currentThread == getExclusiveOwnerThread()) {state++}
private transient Thread exclusiveOwnerThread; //繼承自AbstractOwnableSynchronizer

怎麼樣,看樣子應該是很簡單的吧,畢竟也就四個屬性啊。

AbstractQueuedSynchronizer 的等待隊列示意以下所示,注意了,以後分析過程當中所說的 queue,也就是阻塞隊列不包含 head,不包含 head,不包含 head。

轉存失敗從新上傳取消aqs-0

等待隊列中每一個線程被包裝成一個 node,數據結構是鏈表,一塊兒看看源碼吧:

static final class Node {
    /** Marker to indicate a node is waiting in shared mode */
    // 標識節點當前在共享模式下
    static final Node SHARED = new Node();
    /** Marker to indicate a node is waiting in exclusive mode */
    // 標識節點當前在獨佔模式下
    static final Node EXCLUSIVE = null;

    // ======== 下面的幾個int常量是給waitStatus用的 ===========
    /** waitStatus value to indicate thread has cancelled */
    // 代碼此線程取消了爭搶這個鎖
    static final int CANCELLED =  1;
    /** waitStatus value to indicate successor's thread needs unparking */
    // 官方的描述是,其表示當前node的後繼節點對應的線程須要被喚醒
    static final int SIGNAL    = -1;
    /** waitStatus value to indicate thread is waiting on condition */
    // 本文不分析condition,因此略過吧,下一篇文章會介紹這個
    static final int CONDITION = -2;
    /**
     * waitStatus value to indicate the next acquireShared should
     * unconditionally propagate
     */
    // 一樣的不分析,略過吧
    static final int PROPAGATE = -3;
    // =====================================================

    // 取值爲上面的一、-一、-二、-3,或者0(之後會講到)
    // 這麼理解,暫時只須要知道若是這個值 大於0 表明此線程取消了等待,
    // 也許就是說半天搶不到鎖,不搶了,ReentrantLock是能夠指定timeouot的。。。
    volatile int waitStatus;
    // 前驅節點的引用
    volatile Node prev;
    // 後繼節點的引用
    volatile Node next;
    // 這個就是線程本尊
    volatile Thread thread;

}

Node 的數據結構其實也挺簡單的,就是 thread + waitStatus + pre + next 四個屬性而已,你們先要有這個概念在內心。

上面的是基礎知識,後面會屢次用到,內心要時刻記着它們,內心想着這個結構圖就能夠了。下面,咱們開始說 ReentrantLock 的公平鎖。多嘴一下,我說的阻塞隊列不包含 head 節點。

轉存失敗從新上傳取消aqs-0

首先,咱們先看下 ReentrantLock 的使用方式。

// 我用個web開發中的service概念吧
public class OrderService {
    // 使用static,這樣每一個線程拿到的是同一把鎖,固然,spring mvc中service默認就是單例,別糾結這個
    private static ReentrantLock reentrantLock = new ReentrantLock(true);

    public void createOrder() {
        // 好比咱們同一時間,只容許一個線程建立訂單
        reentrantLock.lock();
        // 一般,lock 以後緊跟着 try 語句
        try {
            // 這塊代碼同一時間只能有一個線程進來(獲取到鎖的線程),
            // 其餘的線程在lock()方法上阻塞,等待獲取到鎖,再進來
            // 執行代碼...
            // 執行代碼...
            // 執行代碼...
        } finally {
            // 釋放鎖
            reentrantLock.unlock();
        }
    }
}

ReentrantLock 在內部用了內部類 Sync 來管理鎖,因此真正的獲取鎖和釋放鎖是由 Sync 的實現類來控制的。

abstract static class Sync extends AbstractQueuedSynchronizer {

}

Sync 有兩個實現,分別爲 NonfairSync(非公平鎖)和 FairSync(公平鎖),咱們看 FairSync 部分。

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

線程搶鎖

不少人確定開始嫌棄上面廢話太多了,下面跟着代碼走,我就不廢話了。

static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;
      // 爭鎖
    final void lock() {
        acquire(1);
    }
      // 來自父類AQS,我直接貼過來這邊,下面分析的時候一樣會這樣作,不會給讀者帶來閱讀壓力
    // 咱們看到,這個方法,若是tryAcquire(arg) 返回true, 也就結束了。
    // 不然,acquireQueued方法會將線程壓到隊列中
    public final void acquire(int arg) { // 此時 arg == 1
        // 首先調用tryAcquire(1)一下,名字上就知道,這個只是試一試
        // 由於有可能直接就成功了呢,也就不須要進隊列排隊了,
        // 對於公平鎖的語義就是:原本就沒人持有鎖,根本不必進隊列等待(又是掛起,又是等待被喚醒的)
        if (!tryAcquire(arg) &&
            // tryAcquire(arg)沒有成功,這個時候須要把當前線程掛起,放到阻塞隊列中。
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
              selfInterrupt();
        }
    }

    /**
     * Fair version of tryAcquire.  Don't grant access unless
     * recursive call or no waiters or is first.
     */
    // 嘗試直接獲取鎖,返回值是boolean,表明是否獲取到鎖
    // 返回true:1.沒有線程在等待鎖;2.重入鎖,線程原本就持有鎖,也就能夠理所固然能夠直接獲取
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        // state == 0 此時此刻沒有線程持有鎖
        if (c == 0) {
            // 雖然此時此刻鎖是能夠用的,可是這是公平鎖,既然是公平,就得講究先來後到,
            // 看看有沒有別人在隊列中等了半天了
            if (!hasQueuedPredecessors() &&
                // 若是沒有線程在等待,那就用CAS嘗試一下,成功了就獲取到鎖了,
                // 不成功的話,只能說明一個問題,就在剛剛幾乎同一時刻有個線程搶先了 =_=
                // 由於剛剛還沒人的,我判斷過了

更多內容請關注微信公衆號【Java技術江湖】

一位阿里 Java 工程師的技術小站。做者黃小斜,專一 Java 相關技術:SSM、SpringBoot、MySQL、分佈式、中間件、集羣、Linux、網絡、多線程,偶爾講點Docker、ELK,同時也分享技術乾貨和學習經驗,致力於Java全棧開發!(關注公衆號後回覆」Java「便可領取 Java基礎、進階、項目和架構師等免費學習資料,更有數據庫、分佈式、微服務等熱門技術學習視頻,內容豐富,兼顧原理和實踐,另外也將贈送做者原創的Java學習指南、Java程序員面試指南等乾貨資源)

相關文章
相關標籤/搜索