Java併發編程(2) AbstractQueuedSynchronizer的設計與實現

一 前言

  上一篇分析AQS的內部結構,其中有介紹AQS是什麼,以及它的內部結構的組成,那麼今天就來分析下前面說的內部結構在AQS中的具體做用(主要在具體實現中體現)。html

二 AQS的接口和簡單示例

  上篇有說到AQS是抽象類,而它的設計是基於模板方法模式的,也就是說:使用者須要繼承同步器並重寫指定的方法,隨後將同步器組合在自定義同步組件的實現中,並調用其提供的模板方法。其中須要子類重寫的方法與描述以下表:java

方法名稱 描述
protected boolean tryAcquire(int arg)

嘗試以獨佔模式獲取。 此方法應查詢對象的狀態是否容許以獨佔模式獲取它,若是是,則獲取它。node

實現該方法須要查詢當前狀態並判斷同步狀態是否預期,而後進行CAS設置同步狀態。ide

protected boolean tryRelease(int arg)

嘗試釋放獨佔式的同步狀態。函數

等待獲取同步狀態的線程將有機會獲取同步狀態。測試

protected int tryAcquireShared(int arg)

嘗試以共享模式獲取。 此方法應查詢對象的狀態是否容許在共享模式下獲取它,若是是,則獲取。ui

實現該方法須要查詢當前狀態並判斷同步狀態是否預期,而後進行CAS設置同步狀態。this

protected boolean tryReleaseShared(int arg)

嘗試釋放共享式的同步狀態。spa

protected boolean isHeldExclusively() 表示當前同步器是否在獨佔模式下被線程佔用。

  在重寫上面這些方法時,可能須要下面這三個方法(注意其中state是使用volatile關鍵字修飾的)線程

方法名 描述
protected final int getState()  獲取當前的同步狀態
protected final void setState(int newState)  設置當前同步狀態
protected final boolean compareAndSetState
(int expect, int update)
使用CAS設置當前狀態,該方法能保證狀態設置的原子性

  其實前面這些都不須要關心,由於這些通常都是在自定義同步組件中實現。自定義同步組件除了重寫第一個表格那些方法外,AQS還爲其提供了一些公共方法(或者說模板方法),這些纔是關鍵,也是重中之重。下面我先簡單列出以及其方法描述,後面一一分析:

方法名稱 描述
public final void acquire(int arg)

獨佔式獲取同步狀態,忽略中斷。

若是當前線程獲取同步狀態成功,則由該方法返回;不然將會進入同步隊列等待(

即上篇說的Node節點隊列)。

該方法將會調用重寫的tryAcquire(int args)方法。

public final void
acquireInterruptibly(int arg)

與acquire(int args)方法同樣,可是該方法響應中斷(從方法名就大概知道意思了吧。)

當前線程未獲取到同步狀態而進入同步隊列中,若是當前線程被中斷,則該方法會拋出InterruptedException異常

public final boolean release(int arg)

獨佔式的釋放同步狀態, 該方法會在釋放同步狀態後將同步隊列中第一個節點包含的線程喚醒。

該方法會調用tryRelease(int args)方法

public final void acquireShared(int arg)

共享式獲取同步狀態,忽略中斷。

若是當前線程獲取同步狀態成功,則由該方法返回;不然將會進入同步隊列等待

(即上篇說的Node節點隊列)。

與獨佔式獲取的主要區別是在同一時刻能夠有多個線程獲取到同步狀態。

該方法將會調用重寫的tryAcquireShare(int args)方法。

public final void acquireSharedInterruptibly(int arg) 與acquireInterruptibly方法相同
public final boolean
releaseShared(int arg)
 共享式的釋放同步狀態

該方法會調用tryReleaseShared(int args)方法

  根據上面提供的方法,同步器主要提供兩種模式:獨佔式和共享式。顧名思義,獨佔式表示同一時刻只有一個線程纔會獲取到同步狀態,而其餘線程都得等待;而共享式就容許同一時刻能夠多個線程獲取到同步狀態。至於示例的話,你們能夠查看源碼類上註釋的Mutx類,表示一個自定義的獨佔鎖。下面我仍是直接貼出示例代碼。

class Mutex implements Lock, java.io.Serializable {
    // 內部類,自定義同步器
    private static class Sync extends AbstractQueuedSynchronizer {
        // 是否處於佔用狀態
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }
        // 當狀態爲0的時候獲取鎖
        public boolean tryAcquire(int acquires) {
            assert acquires == 1; // Otherwise unused
            if (compareAndSetState(0, 1)) {
                // 將當前線程設置爲獨佔線程
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        // 釋放鎖,將狀態設置爲0
        protected boolean tryRelease(int releases) {
            assert releases == 1; // 斷言
            if (getState() == 0) throw new IllegalMonitorStateException();
            // 將線程或狀態 重置爲初始值
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
        // 返回一個Condition,每一個condition都包含了一個condition隊列
        Condition newCondition() { return new ConditionObject(); }
    }
    // 僅須要將操做代理到Sync上便可
    private final Sync sync = new Sync();
    public void lock()                { sync.acquire(1); }
    public boolean tryLock()          { return sync.tryAcquire(1); }
    public void unlock()              { sync.release(1); }
    public Condition newCondition()   { return sync.newCondition(); }
    public boolean isLocked()         { return sync.isHeldExclusively(); }
    public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }
    public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
}
View Code

  看了下自定義的獨佔鎖Metux(上面代碼來自源碼),寫個案例測試下它究竟是否是獨佔鎖(你們應該知道怎麼測試吧)。

public class MutexTest {

    private Lock lock ;
    private MutexTest(Lock lock) {
        this.lock = lock;
    }

    public void runTask() {
        lock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + " 執行任務中...");
            Thread.sleep(3000);
            System.out.println(Thread.currentThread().getName() + " 任務執行完成。");
        } catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        Lock lock = new Mutex();
        final MutexTest test = new MutexTest(lock);
        for (int i = 0; i < 5; i ++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    test.runTask();
                }
            }, "線程" + i).start();
        }
    }
}
View Code

  運行該案例從打印結果中能夠看出,同一時刻只有一個線程在執行(這就是獨佔鎖的特性)。

線程0 執行任務中...
線程0 任務執行完成。
線程2 執行任務中...
線程2 任務執行完成。
線程1 執行任務中...
線程1 任務執行完成。
線程3 執行任務中...
線程3 任務執行完成。
線程4 執行任務中...
線程4 任務執行完成。

三 AQS的核心函數分析

  關於獲取和釋放下面只分析acquire函數和release函數,由於其餘都與這個函數相似。

一、acquire函數

    /**
     * 獨佔式獲取同步狀態,忽略中斷。
     */
    public final void acquire(int arg) {
        /**
         * 1 調用子類的tryAcquiref(arg)方法,若是獲取成功則直接返回,不然以獨佔模式建立節點加入等待隊列
         */
        if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

  acquire函數主要功能都放在這三個方法中:

  • tryAcquire(arg) 子類提供實現
  • addWaiter(Node) 主要是將節點添加到等待隊列中。
  • acquireQueue(Node, int) 主要是提取等待隊列中能獲取同步狀態的節點(遵循FIFO)。

  1.2 下面先分析下addWaiter(Node)函數:

/**
 * 2 根據給定模式爲當前線程建立並排隊節點。
 */
private Node addWaiter(Node mode) {
    // 2.1 根據跟定的模式和當前線程建立節點。(在這就用的上Node了)
    Node node = new Node(Thread.currentThread(), mode);
    // 2.2 嘗試下快速通道:判斷tail節點是否爲空,若是不爲空就直接添加到尾節點後面。
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        // 2.2.1 進入到這個方法說明線程並無獲取鎖,因此須要CAS保證原子性
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 2.3 若是是第一個入隊的節點或者compareAndSetTail設置失敗,那麼就進入enq()方法
    enq(node);
    return node;
}
/**
 * 將節點插入隊列,必要時進行初始化。
 */
private Node enq(final Node node) {
    // 自旋,直至設置添加尾節點成功。
    for (;;) {
        Node t = tail;
        if (t == null) {
            // 2.3.1 尾節點爲空,則須要初始化隊列(同理採起CAS保證原子性)
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 2.3.2 尾節點不爲空,則將節點設置成尾節點(同理採起CAS保證原子性)
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

  上述邏輯主要包括:使用當前線程建立節點,而後將當前節點添加到同步隊列中。其中設置節點都是利用CAS設置,保證原子性。

  具體流程:

  a 先行嘗試在隊尾添加(若是尾節點不爲空)(另外這一步很重要,若是尾節點存在就能夠以最短路徑O(1)的效果來完成線程入隊,是最大化減小開銷的一種方式):

    • 分配引用prev指向尾節點;
    • 將節點的前驅節點更新爲尾節點(current.prev = tail);
    • 若是尾節點是prev,那麼將當尾節點設置爲該節點(tail = current,原子更新);
    • prev的後繼節點指向當前節點(prev.next = current)。

  b 若是是第一個入隊的節點或者compareAndSetTail設置失敗:

    • 若是尾節點爲空,則須要初始化隊列(同理採起CAS保證原子性),繼續自旋判斷;

    •  重複上面a步驟將節點嘗試添加至尾節點後,直接添加成功。      

  1.3 進入sync隊列以後,接下來就是要進行同步狀態的獲取,下面請看acquireQueue(Node, arg)函數: 

/**
 * 3 對於已經在隊列中的線程,以獨佔不間斷模式獲取。
 */
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        // 一樣採起自旋直至條件知足
        for (;;) {
            // 3.1 獲取當前節點的前驅節點
            final Node p = node.predecessor();
            // 3.2 判斷前驅節點是否爲頭節點,並此時是否能夠獲取到同步狀態
            if (p == head && tryAcquire(arg)) {
                // 3.2.1 若上面條件知足,則將當前節點設置爲頭節點。
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

  上述邏輯主要包括:

    • 獲取當前節點的前驅節點;須要獲取當前節點的前驅節點,而頭結點所對應的含義是當前佔有鎖且正在運行。
    • 當前驅節點是頭結點而且可以獲取狀態,表明該當前節點佔有鎖;

      若是知足上述條件,那麼表明可以佔有鎖,根據節點對鎖佔有的含義,設置頭結點爲當前節點。

    • 不然進入等待狀態。

      若是沒有輪到當前節點運行,那麼將當前線程從線程調度器上摘下,也就是進入等待狀態。也就是調用shouldParkAfterFailedAcquire和parkAndCheckInterrupt函數

   下面看下它是怎麼將不知足節點摘下來進入等待狀態的。

/**
 * 檢查並更新獲取失敗的節點的狀態。
 */
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * 狀態處於SIGNAL狀態(-1),表示後繼節點隨時能夠upark
         */
        return true;
    if (ws > 0) {
        /*
         * ws > 0表示處於CANCELLED狀態,則須要跳過找到node節點前面不處於取消狀態的節點。
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * 此時ws爲PROPAGATE -3 或者是0 表示無狀態,(爲CONDITION -2時,表示此節點在condition queue中)
         * 比較並設置前驅結點的狀態爲SIGNAL
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    /**
     * 此時還不肯定Node的前置節點是否處於SIGNAL狀態
     * 因此不支持park操做
     */
    return false;
}

/**
 * 進行park操做而且返回該線程是否被中斷
 */
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

   上述主要邏輯包括:

    • 若是前置節點的狀態是Signal狀態,則返回true。
    • 若是前置節點處於取消狀態,則跳過這種取消節點,找到不是前面不是取消狀態的節點而後返回false;
    • 若是前置節點處於<0的狀態,則利用CAS將其狀態設置成Signal狀態,而後返回false.
    • 通過上面步驟後,若是返回true,則說明能夠中斷線程進入等待。

   那麼acquire函數分析到這就結束了,估計看了一遍仍是不太清晰流程那麼就多看幾遍。下面也對這個流程進行總結下:

二、release函數  

/**
 * 以獨佔模式釋放
 */
public final boolean release(int arg) {
    // tryRelease由子類實現
    if (tryRelease(arg)) {
        // 獲取頭結點
        Node h = head;
        // 頭結點不爲空而且頭結點狀態不爲0
        if (h != null && h.waitStatus != 0)
            // 釋放頭結點的後繼結點
            unparkSuccessor(h);
        return true;
    }
    return false;
}

/**
 * 喚醒後繼節點
 */
private void unparkSuccessor(Node node) {
    // 獲取節點狀態
    int ws = node.waitStatus;
    // 若是節點狀態小於,則將其設置爲初始狀態。
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    // 若是節點狀態是取消或節點爲空,則從尾部向後移動以找到實際未取消的繼任者。
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

  上述邏輯主要包括:

    • 嘗試釋放狀態;

      tryRelease可以保證原子化的將狀態設置回去,固然須要使用compareAndSet來保證。若是釋放狀態成功過以後,將會進入後繼節點的喚醒過程。

    •  喚醒當前節點的後繼節點所包含的線程。

      經過LockSupport的unpark方法將休眠中的線程喚醒,讓其繼續acquire狀態。

四 總結(獲取與釋放過程)

  1. 在獲取時,維護了一個sync隊列,每一個節點都是一個線程在進行自旋,而依據就是本身是不是首節點的後繼而且可以獲取資源;(重點,不清楚的能夠看上面的流程圖)
  2. 在釋放時,僅僅須要將資源還回去,而後通知一下後繼節點並將其喚醒。
  3. 這裏須要注意,隊列的維護(首節點的更換)是依靠消費者(獲取時)來完成的,也就是說在知足了自旋退出的條件時的一刻,這個節點就會被設置成爲首節點。

 

  另外送你們一碗心靈雞湯:)

我從不相信什麼懶洋洋的自由,我向往的自由是經過勤奮和努力實現更廣闊的人生,那樣的自由纔是珍貴的、有價值的。我相信一萬小時定律,我歷來不相信天上掉餡餅的靈感和坐等的成就。作一個自由又自律的人,靠勢必實現的決心認真地活着。

相關文章
相關標籤/搜索