老闆讓只懂Java基本語法的我,基於AQS實現一個鎖

10 點整,我到了公司,又成爲全組最後一個到的員工。java

正準備刷刷手機摸摸魚,看見老闆神祕兮兮地走了過來。程序員

老闆:閃客呀,你寫個工具,基於 AQS 實現一個鎖,給我們組其餘開發用面試

:哦好的算法

老闆:你多久能搞好?編程

:就是一個工具類是吧,嗯今天下午就給你吧設計模式

老闆:嗯,那你抓緊時間搞吧,你們都等着用呢安全

:哦好的多線程


先寫個框架

關於鎖,我還算有一個模糊的認識的,要讓使用者能夠獲取鎖、釋放鎖,來實現多線程訪問時的安全性。因而我趕忙先把一個框架寫了出來。併發

// 給帥氣老闆用的鎖
public class FlashLock {
    // 釋放鎖
    public void lock() {}
    // 釋放鎖
    public void unlock() {}
}

工具類已經完成一半了,一想到全組的開發們下午就會這樣用到個人工具,我不由笑出了聲音。框架

FlashLock flashLock = new FlashLock();

public void doSomeThing() {
    // 獲取鎖,表示同一時間只容許一個線程執行這個方法
    flashLock.lock();
    try {
        ...
    } finally {
        // 優雅地在 finally 裏釋放鎖
        flashLock.unlock();
    }
}

隨着同事們投來異樣的眼光,我回過神來。繼續想,我怎麼在這倆方法裏實現這種鎖的效果呢?腦子一片空白呀,誒不過老闆剛剛說要基於 AQS,那確定這個東西能夠給我提供一些方便吧,因而我在百度百科搜了一下什麼是 AQS

百度百科還沒有收錄詞條 「AQS」

這老闆水平也太次了,給我推薦個百科上都搜不到的東西... 只能搜搜百度了

額!這看起來仍是個 Java 面試的重點呢!真是錯怪老闆了。

我點了其中一篇,瞭解到 AQS 的全稱叫 AbstractQueuedSynchronizer(抽象的隊列式同步器),是一個 JDK 源碼中的一個

嗨,搞了半天只是個類而已嘛,對我這種源碼在手天下我有的神級碼農,還看什麼文章呀,我迅速打開了 JDK1.8 源碼,找到了這個類。

個人天,一共 2316 行!我趕忙把全部註釋都去掉,發現還有 914 行。

因爲下午就要交稿,我打消了不看註釋硬啃源碼的念頭,開始從頭看起了註釋...

2:使用 AQS 實現最簡單的鎖

Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues. This class is designed to be a useful basis for most kinds of synchronizers that rely on a single atomic int value to represent state. Subclasses must define the protected methods that change this state, and which define what that state means in terms of this object being acquired or released.

第一句話說這是個框架,以後說這個類是基於一個原子變量,這說的都是原理我先無論。

後面又說子類(Subclasses)必須實現一些改變狀態(change this state)和獲取釋放鎖(acquired or released)的方法。

哦!看來我須要用一個子類繼承它,而後實現它指定的一些方法,其餘的事情這個父類都會幫我作好的。敏銳的我立刻察覺到,這用的模板方法這種設計模式,這是我最喜歡的設計模式了,由於只須要讀懂須要讓子類實現的模板方法的含義,便可以很好地使用這個類的強大功能。

因而我趕忙去找,有哪些這樣的模板方法,須要子類去實現,果真在註釋中發現了這樣一段話。

* To use this class as the basis of a synchronizer, redefine the
 * following methods
 *
 * <li> {@link #tryAcquire}
 * <li> {@link #tryRelease}
 * <li> {@link #tryAcquireShared}
 * <li> {@link #tryReleaseShared}
 * <li> {@link #isHeldExclusively}
 * </ul>

在源碼中找到這幾個類

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}
protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}
protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}
protected boolean tryReleaseShared(int arg) {
    throw new UnsupportedOperationException();
}
protected boolean isHeldExclusively() {
    throw new UnsupportedOperationException();
}

一看清一色都是拋出異常我就放心了,這正是留給咱們子類實現的模板方法呀,接下來就是我寫個類實現他們就好咯,但是怎麼寫...

正想去百度,忽然發現註釋中竟然給出了一段 基於 AQS 的實現小 demo,還挺長,我理解了它的意思,而且把我看不懂的都去掉了,寫出了很簡潔的鎖

public class FlashLock {

    // 獲取鎖(這回填好骨肉了)
    public void lock() {
        sync.acquire(1);
    }
    // 釋放鎖
    public void unlock() {
        sync.release(1);
    }

    private final Sync sync = new Sync();

    // 這個內部類就是繼承並實現了 AQS 但我這裏只先實現兩個方法
    private static class Sync extends AbstractQueuedSynchronizer {

        @Override
        public boolean tryAcquire(int acquires) {
            // CAS 方式嘗試獲取鎖,成功返回true,失敗返回false
            if (compareAndSetState(0, 1)) {
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int releases) {
            // 釋放鎖,這裏爲何不像上面那樣也是 CAS 操做呢?請讀者思考
            setState(0);
            return true;
        }
    }
}

lock 和 unlock 方法都實現了,我趕忙寫個經典的測試代碼

// 可能發生線程安全問題的共享變量
private static long count = 0;

// 兩個線程併發對 count++
public static void main(String[] args) throws Exception {
    // 建立兩個線程,執行add()操做
    Thread th1 = new Thread(()-> add());
    Thread th2 = new Thread(()-> add());
    // 啓動兩個線程
    th1.start();
    th2.start();
    // 等待兩個線程執行結束
    th1.join();
    th2.join();
    // 這裏應該是 20000 就對了,說明鎖生效了
    System.out.println(count);
}

// 我畫了一上午寫出來的鎖,哈哈
private static ExampleLock exampleLock = new ExampleLock();

// 循環 count++,進行 10000 次
private static void add() {
    exampleLock.lock();
    for (int i = 0; i < 10000; i++) {
        count++;
    }
    add2();
    // 沒啥異常,我就直接釋放鎖了
    exampleLock.unlock();
}

測了好幾回,發現都是 20000,哈哈,大功告成,我趕忙在大羣裏 @全部人,告訴你們我寫的新工具。同事和老闆紛紛給我點了贊。

我又忍不住笑出了聲音。走出了公司,準備找個地方吃午餐。

不得不研究下 AQS 的原理

下午兩點整,我又成爲公司最後一個午睡起牀的人...

小宇:閃客,你的工具類確實好用,並且源碼也很簡潔

:哈哈,你們喜歡用就好

小宇:不過我有個問題,就是我用你的這個鎖工具,有的線程老是搶不到鎖,有的線程老是能搶到鎖。雖然說線程們搶鎖確實看命,但能不能加入一種設計,讓各個線程機會均等些,起碼不要出現某幾個線程老是特倒黴搶不到鎖的狀況呢?

:這怎麼可能,我就是寫個鎖工具,還能影響到人家 CPU 和操做系統層面的機制?

小宇:你想一想吧,做爲公司最帥的程序猿,我相信你哦

:額這...

我這人最禁不住妹子誇獎,趕忙開啓電腦屏幕,盯着個人獲取鎖的代碼看

@Override
public boolean tryAcquire(int acquires) {
    // 一上來就 CAS 搶鎖
    if (compareAndSetState(0, acquires)) {
        return true;
    }
    return false;
}

我發現這段代碼中在嘗試獲取鎖時,一上來就 CAS 搶鎖,一旦成功就返回了 true。那我這裏是否能加入某些機制,使這些線程不要一有機會就開始直接開始搶鎖,而是先考慮一下其餘線程的感覺再決定是否搶鎖呢?

我發現此時不得不研究一下 AQS 的內部實現邏輯了,也就是原理,看看能不能獲得一些思路。

我看 AQS 雖然方法一大堆,但屬性一共就四個(有一個是內部類 Node)

public abstract class AbstractQueuedSynchronizer {
    private transient volatile Node head;
    private transient volatile Node tail;
    private volatile int state;
    static final class Node {}
}

static final class Node {
    // ... 省略一些暫不關注的
    volatile Node prev;
    volatile Node next;
    volatile Thread thread;
}

結合最開始看那段對 AQS 高度歸納的註釋

Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues. This class is designed to be a useful basis for most kinds of synchronizers that rely on a single atomic int value to represent state.

不難猜到這裏的內部類 Node 以及其類型的變量 headtail 就表示 AQS 內部的一個等待隊列,而剩下的 state 變量就用來表示鎖的狀態

等待隊列應該就是線程獲取鎖失敗時,須要臨時存放的一個地方,用來等待被喚醒並嘗試獲取鎖。再看 Node 的屬性咱們知道,Node 存放了當前線程的指針 thread,也便可以表示當前線程並對其進行某些操做,prev 和 next 說明它構成了一個雙向鏈表,也就是爲某些須要獲得前驅後繼節點的算法提供便利。

太好了,僅僅看一些屬性和一段註釋,就獲得了一個關於 AQS 大體原理的猜想,看起來還挺靠譜,我趕忙把它畫成幾張圖來加深理解。(因爲這裏很是重要,就再也不賣關子了,直接畫出最正確的圖理解,但不會過於深刻細節)

如下的圖是 AQS 最爲核心的幾行代碼的直觀理解過程,請你們仔細品味

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

看完圖沒太消化的,這裏還有一次機會,我來捋一捋。

首先初始狀態 AQS 的 state=0,表示沒有線程持有鎖。head 和 tail 也都爲空,表示此時等待隊列裏也沒有線程。

這時第一個線程(線程1)來了,沒有任何線程和它搶,直接拿到了鎖(state=1)

而後線程2也來了,假設此時線程1沒有釋放鎖,那麼線程2搶鎖失敗(執行你本身寫的 tryAcquire)。失敗後,剩下的事都是 AQS 幫你作的,首先加入等待隊列的隊尾 addWaiter​(此時隊列爲空,因此要先初始化一個佔位的頭結點),而後在循環裏嘗試獲取鎖 acquireQueue(注意這裏面有至關多的細節,首先前驅結點是頭結點的才能嘗試獲取鎖,即排在隊頭的纔有機會。再有循環裏獲取鎖並非一直 CAS,而是經過一個標誌控制了次數,使得 CAS 兩次都失敗後就將線程掛起 park,以後等待持有鎖的線程釋放鎖以後再喚醒 unpark。其他各類細節,但願讀者閱讀源碼,不是一句兩句說清楚的)。

而後線程3也來了​,經歷了和線程2同樣的經歷,只不過它的前驅結點不是頭結點,所以還不能有機會嘗試獲取鎖,只有等線程2搶到了鎖而且出隊,本身的前驅結點變成了頭結點,才能夠。

這時線程1終於釋放了鎖(state=0),與此同時找到隊列的頭結點進行喚醒 unpark。此時頭結點是線程2表示的 Node,所以對線程2進行了喚醒操做。若是此時線程2沒有被掛起,說明還在嘗試獲取鎖的過程當中,那麼就嘗試好了。若是已經被掛起了,那麼喚醒線程2,使得線程2繼續不斷嘗試 CAS 獲取鎖,直到成功爲止。

如此,循環往復... 你大概懂了麼?

嗯原理搞懂了,實現一個公平鎖

仔細看上面的倒數第二張圖。

好好好,你懶得往上翻,我給你粘過來。

本來在隊列中等待的線程 2,被線程 1 釋放鎖以後喚醒了,但它仍然須要搶鎖,並且有可能搶失敗

那若是每次這個線程 2 嘗試搶鎖時,都有其餘新來的線程把鎖搶去,那線程 2 就一直得不到運行機會,並且排在線程 2 後面的等待線程,也都沒有機會運行。

致使有的線程一直得不到運行機會的,就是這個新進來的線程每次都無論有沒有人排隊,都直接上來就搶鎖致使的。

妥了,剛剛小宇提出的問題,我終於有了思路,就是讓新來的線程搶鎖時,先問一句,「有沒有人排隊呀?若是有人排隊那我先排到隊尾好了」。

@Override
public boolean tryAcquire(int acquires) {
    // 原有基礎上加上這個
    if (有線程在等待隊列中) {
        // 返回獲取鎖失敗,AQS會幫我把該線程放在等待隊列隊尾的
        return false;
    }
    if (compareAndSetState(0, 1)) {
        return true;
    }
    return false;
}

怎麼判斷是否有線程在等待隊列呢?機智的我以爲,AQS 這麼優秀的框架必定爲上層提供了一個方法,不會讓咱們深刻到它實現的內部的,果真我找到了。

public final boolean hasQueuedPredecessors()

再通過優化結構後,最終的代碼變成了這樣

@Override
public boolean tryAcquire(int acquires) {
    if (hasQueuedPredecessors() &&
            compareAndSetState(0, 1)) {
        return true;
    }
    return false;
}

哈哈,大功告成,趕忙去找小宇顯擺一下。

等等...

那我原來的那種實現方式就沒了,確定有其餘人找我質問,emmm,我兩種方式都暴露給你們吧,隨你們選。

我將原來的暴力搶鎖方式起了個名,叫非公平鎖,由於線程搶鎖不排隊,純看臉。按小宇需求實現的排隊獲取鎖,我叫它公平鎖,由於只要有線程在排隊,新來的就得乖乖去排隊,不能直接搶。

// 想要公平鎖,就傳 true 進來
public FlashLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

哈哈,有了高大上的名字和代碼實現,我興高采烈去找小宇交差了。

老闆要求方法能夠重入

晚上五點半,我正準備成爲全組第一個去吃飯的人,忽然老闆陰着臉跑了過來。

老闆:閃客,我用你這工具致使一個線程卡死了呀,一直獲取不到鎖

:嗯怎麼會呢?

老闆:代碼發你了,你趕忙看看!

我打開了鎖了屏的電腦,點開了老闆發來的代碼

public void doSomeThing2() {
    flashLock.lock();
    doSomeThing2();
    flashLock.unlock();
}

public void doSomeThing2() {
    flashLock.lock();
    ...
    flashLock.unlock();
}

我恍然大悟,原來一個線程執行了一個方法,獲取了鎖,這個方法沒有結束,又調用了另外一個須要鎖的方法,因而卡在這不再走了。

這個原理很容易理解,但這彷佛用起來確實不太友好,怪不得老闆那麼生氣。有沒有辦法,讓同一個線程持有鎖時,還能繼續獲取鎖(可重入),只有當不一樣線程才互斥呢?

我苦思冥想,感受不對呀,如今 AQS 裏面的全部變量我都用到了,沒見哪一個變量能夠記錄當前線程呀。

哦對!AQS 自己還繼承了 AbstractOwnableSynchronizer 這個類!我很快在這個類裏面發現了這個屬性!

/**
 * The current owner of exclusive mode synchronization.
 */
private transient Thread exclusiveOwnerThread;

熟悉了以前的套路,我很快又找到了這兩個方法!

protected final void setExclusiveOwnerThread(Thread thread);
protected final Thread getExclusiveOwnerThread();

大功告成,此時我只要在一個線程發現鎖已經被佔用時,不直接放棄,而是再看一下佔用鎖的線程是否是正是我本身,就行了。有了前面的經驗,此次我直接寫出了最終的可重入的公平鎖代碼。

@Override
public boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (hasQueuedPredecessors() && compareAndSetState(0, 1)) {
            // 拿到鎖記得記錄下持鎖線程是本身
            setExclusiveOwnerThread(current);
            return true;
        }
    } else if (current == getExclusiveOwnerThread()) {
        // 看見鎖被佔了(state=0)也別放棄,看看是否是本身佔的
        setState(c + acquires);
        return true;
    }
    return false;
}

6. 下班!

我把這個最終版的鎖代碼提交,霸氣地收拾東西下班了,今天真是收穫滿滿。

好啦故事講完了,若是你堅持讀到了這裏而且徹底理解了上面的全部事情,那麼恭喜你,你已經掌握了 AQS 的核心原理以及基於它的一個經典的鎖實現 ReentrantLock 的幾乎所有知識點,AQS 的體系骨架算是被你不知不覺創建起來了,這兩個都是 Java 程序員面試必備的東西。

雖然這只是皮毛,但若是你是第一次接觸這兩個概念,那本篇文章的最大意義在於對他們有了一個三觀很正的第一印象。我但願 AQS 的給你的第一印象不是什麼抽象的隊列式同步器,而只是一個爲了更方便實現各類鎖而提供的一個包含幾個模板方法的類而已,雖然並不許確,並且顯得很 low,但實則可能偏偏是說到了本質。

7. 繼續深刻 AQS

我以後也會出關於 AQS 繼續深刻的文章,不過下面的三篇系列文章你能夠花上兩三個小時在電腦上看一下,真的很是很是很是給力。

另外,我也推薦你,用跟蹤源碼或 debug 的方式,從頭至尾本身跟一遍下面三行代碼,是幾乎 AQS 的所有核心邏輯,這個看懂了,其餘的都是浮雲。

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

最後呢,若是你這兩個都看不下去,關注低併發編程,一樣能獲得有趣且深刻的理解,哈哈哈。

相關文章
相關標籤/搜索