Java 隊列同步器 AQS


本文部分摘自《Java 併發編程的藝術》java


概述

隊列同步器 AbstractQueuedSynchronize(如下簡稱同步器),是用來構建鎖(Lock)或者其餘同步組件(JUC 併發包)的基礎框架,它使用了一個 int 成員變量表示同步狀態,經過內置的 FIFO 隊列來完成資源獲取線程的排隊工做編程

同步器的主要使用方式是繼承,子類經過繼承同步器並實現它的抽象方法來管理同步狀態,子類推薦被定義爲自定義同步組件的靜態內部類。同步器自身沒有實現任何同步接口,它僅僅是定義了若干同步狀態的獲取和釋放方法來供自定義組件使用併發

一言以蔽之,同步器是實現鎖(也能夠是任意同步組件)的一種方式,它屏蔽了更加底層的一些機制,使開發者更易於理解和使用框架


隊列同步器的接口

同步器的設計是基於模板方法模式的,使用者須要繼承隊列同步器並重寫指定的方法,隨後將同步器組合在自定義同步組件的實現中,並調用同步器提供的模板方法,而這些模板方法將會調用使用者重寫的方法ide

1. 訪問或修改同步狀態

重寫同步器指定的方法時,須要使用同步器提供的以下三個方法來訪問或修改同步狀態:工具

  • getState()測試

    獲取當前同步狀態ui

  • setState(int newState)線程

    設置當前同步狀態設計

  • compareAndSetState(int expect, int update)

    使用 CAS 設置當前狀態,該方法能保證狀態設置的原子性

2. 同步器可重寫的方法

方法名稱 描述
protected boolean tryAcquire(int arg) 獨佔式獲取同步狀態,實現該方法須要查詢當前狀態,並判斷同步狀態是否符合預期,而後再進行 CAS 設置同步狀態
protected boolean tryRelease(int arg) 獨佔式地釋放同步狀態,等待獲取同步狀態的線程將有機會獲取同步狀態
protected int tryAcquireShared(int arg) 共享式獲取同步狀態,返回大於等於 0 的值,表示獲取成功,不然獲取失敗
protected boolean tryReleaseShared(int arg) 共享式釋放同步狀態
protected boolean isHeldExclusively() 當前同步器是否在獨佔模式下被線程佔有,通常該方法表示是否被當前線程所獨佔

3. 同步器提供的模板方法

方法名稱 描述
void acquire(int arg) 獨佔式獲取同步狀態,若是當前線程獲取同步狀態成功,則由該方法返回,不然,將會進入同步隊列等待,該方法將會調用重寫的 tryAcquire(int arg) 方法
void acquireInterruptibly(int arg) 與 acquire(int arg) 相同,但該方法響應中斷,當前線程未獲取到同步狀態而進入同步隊列中,若是當前線程被中斷,則該方法會拋出 InterruptedException 並返回
boolean tryAcquireNanos(int arg, long nanos) 在 acquireInterruptibly(int arg) 的基礎上增長了超時限制
void acquireShared(int arg) 共享式的獲取同步狀態,與獨佔式獲取的主要區別是在同一時刻能夠有多個線程獲取到同步狀態
void acquireSharedInterruptibly(int arg) 與 acquireShared(int arg) 相同,該方法響應中斷
boolean tryAcquireSharedNanos(int arg, long nanos) 在 acquireSharedInterruptibly 的基礎上增長了超時限制
boolean release(int arg) 獨佔式的釋放同步狀態,該方法會在釋放同步狀態以後,將同步隊列中第一個節點包含的線程喚醒
boolean releaseShared(int arg) 共享式的釋放同步狀態
Collection<Thread> getQueuedThreads() 獲取等待在同步隊列上的線程集合

4. 示例

下面經過一個獨佔鎖的示例來深刻了解一下同步器的工做原理。顧名思義,獨佔鎖就是在同一時刻只能有一個線程獲取到鎖,其餘獲取鎖的線程只能處於同步隊列中等待,只有獲取鎖的線程釋放了鎖,後繼的線程才能獲取鎖

public class Mutex implements Lock {

    /**
     * 自定義同步器
     */
    private static class Sync extends AbstractQueuedSynchronizer {

        @Override
        protected boolean isHeldExclusively() {
            // 是否處於佔用狀態
            return getState() == 1;
        }

        @Override
        public boolean tryAcquire(int acquires) {
            // 當狀態爲 0 時獲取鎖
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int releases) {
            // 釋放鎖,將狀態設置爲 0
            if (getState() == 0) {
                throw new IllegalMonitorStateException();
            }
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        /**
         * 返回一個 Condition, 每一個 condition 都包含一個 condition 隊列
         */
        Condition newCondition() {
            return new ConditionObject();
        }
    }

    private final Sync sync = new Sync();

    @Override
    public void lock() {
        sync.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.release(1);
    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
}

Mutex 中定義了一個靜態內部類,該內部類繼承了同步器並實現了獨佔式獲取和釋放同步狀態。用戶使用 Mutex 時並不會直接和內部同步器實現打交道,而是調用 Mutex 提供的方法,大大下降了實現一個可靠自定義組件的門檻


隊列同步器的實現

1. 同步隊列

同步器依賴內部的同步雙向隊列來完成同步狀態的管理,當前線程獲取同步狀態失敗後,同步器會將當前線程及其等待狀態等信息構形成一個節點,並加入同步隊列,同時阻塞當前線程。當同步狀態釋放後,會把首節點中的線程喚醒,使其再次嘗試獲取同步狀態

節點是構成同步隊列的基礎,同步器擁有首節點(head)和尾結點(tail),沒有成功獲取同步狀態的線程將會成爲節點並加入該隊列的尾部

同步隊列的基本結構以下:

同步器將節點加入到同步隊列的過程如圖所示:

首節點是獲取同步狀態成功的節點,首節點線程在釋放同步狀態時,會喚醒後繼節點,然後繼節點將會在獲取同步狀態成功時將本身設置爲首節點,過程以下:

設置首節點是經過獲取同步狀態成功的線程來完成的,因爲只有一個線程可以成功獲取同步狀態,所以設置頭節點的方法並不須要使用 CAS 來保證,只須要將首節點設置成原首節點的後繼節點並斷開原首節點的 next 引用便可

2. 獨佔式同步狀態獲取與釋放

經過調用同步器的 acquire(int arg) 方法能夠獲取同步狀態,該方法對中斷不敏感,線程獲取同步狀態失敗則進入同步隊列中,後續對線程進行中斷操做,線程不會從同步隊列中移出

獨佔式同步狀態獲取流程,也就是 acquire(int arg) 方法調用流程如圖所示:

若是當前線程獲取同步狀態失敗,就會生成一個節點(獨佔式 Node.EXCLUSIVE,同一時刻只能有一個線程成功獲取同步狀態),並加入到隊列尾部。一個隊列裏有不少節點,而只有前驅節點是頭節點的節點才能嘗試獲取同步狀態,緣由有兩個:

  • 頭節點是成功獲取到同步狀態的節點,而頭節點的線程釋放了同步狀態以後,將會喚醒其後繼節點,後繼節點的線程被喚醒後須要檢查本身的前驅節點是不是頭節點
  • 維護同步隊列的 FIFO 原則

所以,若是隊列中的非頭節點線程的前驅節點出隊或者被中斷而從等待狀態返回,那麼其隨後會檢查本身的前驅是否爲頭節點,若是是則嘗試獲取同步狀態

當前線程獲取同步狀態並執行了相應邏輯以後,就須要釋放同步狀態,使得後繼節點可以繼續獲取同步狀態。經過調用同步器的 release(int arg) 方法能夠釋放同步狀態,該方法執行時,會喚醒頭節點的後繼節點線程

3. 共享式同步狀態獲取與釋放

共享式獲取與獨佔式獲取最主要的區別在於同一時刻可否有多個線程同時獲取到同步狀態。以文件的讀寫爲例,若一個程序在對文件進行讀操做,那麼這一時刻對於該文件的寫操做均被阻塞,而讀操做可以同時進行。寫操做要求對資源的獨佔式訪問,而讀操做能夠是共享式訪問,兩種不一樣的訪問模式在同一時刻對文件或資源的訪問狀況,以下圖所示:

經過調用同步器的 acquireShared(int arg) 方法能夠共享式地獲取同步狀態,其代碼核心邏輯和 acquire() 差很少,也是判斷當前節點的前驅是否爲頭節點,若是是就嘗試獲取同步狀態。頭節點在釋放同步狀態以後,也會喚醒後續處於等待狀態的節點

問題的關鍵在於如何作到多個線程訪問同步狀態,由於按照上面所講的過程,和獨佔式幾乎沒有任何區別。獨佔式與共享式在實現上的差異其實僅僅在於:每次頭節點釋放同步狀態以後,獨佔式只是把其後繼節點設置爲頭節點,而共享式還多了一個傳播的過程(筆者能力有限,這一塊沒搞明白,就不瞎寫了。。)

與獨佔式同樣,共享式獲取也須要釋放同步狀態,經過調用 releaseShared(int arg) 方法能夠釋放同步狀態,並喚醒後續處於等待狀態的節點

4. 獨佔式超時獲取同步狀態

經過調用同步器的 doAcquireNanos(int arg, long nanosTimeout) 方法能夠超時獲取同步狀態,即在指定的時間段內獲取同步狀態

在介紹這個方法以前,先介紹一下響應中斷的同步狀態獲取過程。Java5 之後,同步器提供了 acquireInterruptibly(int arg) 方法,這個方法在等待獲取同步狀態時,若是當前線程被中斷,會馬上返回,並拋出 InterruptedException

超時獲取同步狀態能夠視爲響應中斷獲取同步狀態的加強版。獨佔式超時和非獨佔式獲取在流程上很是類似,其主要區別在於未獲取到同步狀態時的處理邏輯。acquire(int arg) 在未獲取到同步狀態時,會使當前線程一致處於等待狀態,而 doAcquireNanos(int arg, long nanosTimeout) 會使當前線程等待 nanosTimeout 納秒,若是當前線程在 nanosTimeout 納秒內沒有獲取同步狀態,將會從等待邏輯中自動返回


自定義同步組件

設計一個同步工具:同一時刻,只能容許至多兩個線程同時訪問,超過兩個線程的訪問將被阻塞。顯然這是共享式訪問,主要設計思路以下:

  • 重寫 tryAcquireShared(int args) 方法和 tryReleaseShared(int args) 方法
  • 定義初始狀態 status 爲 2,當一個線程進行獲取,status 減 1,該線程釋放,status 加 1,爲 0 時再有其餘線程進行獲取,則阻塞

示例代碼以下:

public class TwinsLock implements Lock {

    private final Sync sync = new Sync(2);

    private static final class Sync extends AbstractQueuedSynchronizer {

        Sync(int count) {
            if (count <= 0) {
                throw new IllegalArgumentException("count must large than zero");
            }
            setState(count);
        }

        @Override
        public int tryAcquireShared(int reduceCount) {
            while (true) {
                int current = getState();
                int newCount = current - reduceCount;
                if (newCount < 0 || compareAndSetState(current, newCount)) {
                    return newCount;
                }
            }
        }

        @Override
        protected boolean tryReleaseShared(int reduceCount) {
            while (true) {
                int current = getState();
                int newCount = current + reduceCount;
                if (compareAndSetState(current, newCount)) {
                    return true;
                }
            }
        }

        Condition newCondition() {
            return new ConditionObject();
        }
    }

    @Override
    public void lock() {
        sync.acquireShared(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquireShared(1) > 0;
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.releaseShared(1);
    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
}

再編寫一個測試來驗證 TwinsLock 是否按預期工做

public class TwinsLockTest {


    public static void main(String[] args) {

        final Lock lock = new TwinsLock();

        class Worker extends Thread {

            @Override
            public void run() {
                while (true) {
                    lock.lock();
                    try {
                        SleepUtils.second(1);
                        System.out.println(Thread.currentThread().getName());
                        SleepUtils.second(1);
                    } finally {
                        lock.unlock();
                    }
                }
            }
        }

        for (int i = 0; i < 10; i++) {
            Worker worker = new Worker();
            worker.setDaemon(true);
            worker.start();
        }

        for (int i = 0; i < 10; i++) {
            SleepUtils.second(1);
            System.out.println();
        }
    }
}

運行該測試用例,發現線程名稱成對輸出,說明同一時刻只有兩個線程可以獲取到鎖

相關文章
相關標籤/搜索