併發編程之 AQS 源碼剖析

前言

JDK 1.5 的 java.util.concurrent.locks 包中都是鎖,其中有一個抽象類 AbstractQueuedSynchronizer (抽象隊列同步器),也就是 AQS, 咱們今天就來看看該類。java

1.結構

類結構

咱們看看該類的結構,該類被 CountDown,ThreadPoolExecutor,ReentrantLock,ReentrantReadWriteLock,Semaphore 的內部類所繼承,而這些內部類都是這些鎖的真正實現,不管是公平鎖仍是非公平鎖。node

也就是說,這些鎖的真正實現都是該類來實現的。那麼,咱們就從這些鎖開始看看是如何實現從鎖到解鎖的。less

2. 重入鎖的 lock 方法

咱們先看看重入鎖 ReentranLock 的 lock 方法。工具

public void lock() {
        sync.lock();
    }

複製代碼

該方法調用了內部類的 sync 抽象類的 lock 方法,該方法的實現有公平鎖和非公平鎖。咱們看看公平鎖是如何實現的:ui

static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
            acquire(1);
        }
複製代碼

調用了 acquire 方法,該方法就是 AQS 的的方法,由於 sync 繼承了 AQS,而公平鎖繼承了 Sync,等於間接繼承了 AQS,咱們看看該方法。this

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
複製代碼

該方法JDK註釋 :spa

以獨佔模式獲取對象,若是被中斷則停止。經過先檢查中斷狀態,而後至少調用一次 tryAcquire(int) 來實現此方法,並在成功時返回。不然在成功以前,或者線程被中斷以前,一直調用 tryAcquire(int) 將線程加入隊列,線程可能重複被阻塞或不被阻塞。可使用此方法來實現 Lock.lockInterruptibly() 方法。操作系統

樓主來簡單說一下該方法的做用:該方法會試圖獲取鎖,若是獲取不到,就會被加入等待隊列等待被喚醒,這個其實和咱們以前分析的 synchronized 是差很少的。線程

咱們仔細看看該方法,首先是 tryAcquire 方法,也就是嘗試獲取鎖,該方法是須要被寫的,父類默認的方法是拋出異常。如何重寫呢?抽象類定義一個標準:若是返回 true,表示獲取鎖成功,反之失敗。設計

tryAcquire

咱們回到 acquire 方法,若是獲取鎖成功,就直接返回了,若是失敗了,則繼續後面的操做,也就是將線程放入等待隊列中:

acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

咱們先看看 addWaiter(Node.EXCLUSIVE) 方法:

/** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

複製代碼

該方法註釋:將當前線程放入到隊列節點。參數呢?參數有2種,Node.EXCLUSIVE 是獨佔鎖,Node.SHARED 是分享鎖。

在 Node 類種定義了這兩個常量:

static final class Node {
        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;
複製代碼

獨佔鎖是null,共享鎖是空對象。

咱們看看該方法的步驟:

  1. 建立一個當前線程的 Node 對象(nextWaiter 屬性爲 null, thread 屬性爲 當前線程)。
  2. 獲取到末端節點,若是末端節點不爲 null,則將末端節點設置爲剛剛建立的節點的 prev 屬性。 2.1. 經過 CAS 設置末端節點爲新的節點。若是成功,將剛剛建立的節點設置爲老末端節點的next節點。最後返回。
  3. 若是 tail 末端節點是null,則調用enq 方法。建立一個末端節點,而後,將剛剛建立的末端節點設置爲新節點的 prev 屬性(此時的末端節點就是 head 頭節點)。最後返回剛剛建立的 node 節點。

咱們看看 enq 方法的實現:

private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
複製代碼

該方法步驟以下:

  1. 死循環,獲取到末端節點,若是是null,則使用CAS建立一個頭節點(頭節點此時也是null),並將頭節點賦值末端節點。
  2. 因爲剛剛CAS 成功,走else 邏輯,將末端節點賦值給新節點的 prev 屬性,使用CAS設置新的末端節點爲剛剛建立的 node對象。而後返回node 對象。

該方法主要就是初始化頭節點和末端節點,並將新的節點追加到末端節點並更新末端節點。

咱們會到 addWaiter 方法中,該方法主要做用就是根據當前線程建立一個 node 對象,並追加到隊列的末端。

咱們再回到 acquire 方法:

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

複製代碼

addWaiter 方法會返回剛剛建立的node 對象,而後調用 acquireQueued 方法,咱們進入該方法查看:

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
複製代碼

該方法步驟以下:

  1. 死循環。先獲取 node 對象 prev 節點,若是該節點和 head 相等,說明是他是第二個節點,那麼此時就能夠嘗試獲取鎖了。 1.1 若是獲取鎖成功,就設置當前節點爲 head 節點(同時設置當前node的線程爲null,prev爲null),並設置他的 prev 節點的 next 節點爲 null(幫助GC回收)。最後,返回等待過程當中是否中斷的布爾值。
  2. 若是上面的兩個條件不成立,則調用 shouldParkAfterFailedAcquire 方法和 parkAndCheckInterrupt 方法。這兩個方法的目的就是將當前線程掛起。而後等待被喚醒或者被中斷。稍後,咱們仔細查看這兩個方法。
  3. 若是掛起後被當前線程喚醒,則再度循環,判斷是該節點的 prev 節點是不是 head,通常來說,當你被喚醒,說明你別准許去拿鎖了,也就是 head 節點完成了任務釋放了鎖。而後重複步驟 1。最後返回。

咱們看看 shouldParkAfterFailedAcquire 方法:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)

            return true;
        if (ws > 0) {

            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
     
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
複製代碼

該方法步驟以下:

  1. 獲取去上一個節點的等待狀態,若是狀態是 SIGNAL -1,就直接返回 true,表示能夠掛起並休息。
  2. 若是 waitStatus 大於 0, 則循環檢查 prev 節點的 prev 的waitStatus,知道遇到一個狀態不大於0。該字段有4個狀態,分別是 CANCELLED = 1,SIGNAL = -1, CONDITION = -2, PROPAGATE = -3,也就是說,若是大於 0,就是取消狀態。那麼,往上找到那個不大於0的節點後怎麼辦?將當前節點指向 那個節點的 next 節點,也就是說,那些大於0 狀態的節點都失效這裏,隨時會被GC回收。
  3. 若是不大於0 也不是 -1,則將上一個節點的狀態設置爲有效, 也就是 -1.最後返回 false。注意,在acquireQueued 方法中,返回 false 後會繼續循環,此時 pred 節點已是 -1 了,所以最終會返回 true。

再看 parkAndCheckInterrupt 方法(掛起並檢查是否中斷):

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
複製代碼

該方法很是的簡單,就是將當前線程掛起,等到有別的線程喚醒(一般是 head 節點中線程),而後返回當前線程是不是被中斷了,注意,該方法會清除中斷狀態。

回到 acquireQueued 方法,總結一下該方法,該方法就是將剛剛建立的線程節點掛起,而後等待喚醒,若是被喚醒了,則將本身設置爲 head 節點。最後,返回是否被中斷。

再回到 acquire 方法:

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

複製代碼

在該方法中,若是獲取鎖失敗並被喚醒,且被中斷了,那麼就執行 selfInterrupt 方法:

static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }
複製代碼

將當前線程設置中斷狀態位。

好了,到這裏,整個lock 方法,咱們基本就分析完了,能夠說,整個方法就是將線程放入到等待隊列並掛起而後等待 head 節點喚醒。其中,tryAcquire 方法高頻出現,該方法具體實現由子類實現,好比 重入鎖,讀寫鎖,線程池的 worker,其中 CountDown 和 Semaphore 實現的是共享模式的 tryAcquire 方法,但原理相同。AQS 如何定義的?就是返回 true 表示拿到鎖了,返回 false 表示拿鎖失敗,具體如何實現AQS管不了。但他們都依賴一個極其重要的字段 ------- state。

樓主有必要說說這個字段,該字段定義了當前同步器的狀態,若是你們知道 pv 原語的話,應該很好理解這個字段,該字段在 AQS 中是如何定義的:

/** * The synchronization state. */
    private volatile int state;
複製代碼

volatile。該字段可能會被多個線程修改,所以,須要設置爲 volatile ,保證變量的可見性。

咱們能夠看看 重入鎖中的公平鎖是如何使用該字段的。

/** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. */
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }
複製代碼

該方法重寫了 tryAcquire 方法,步驟以下:

  1. 獲取當前線程,獲取鎖(同步器)的狀態。
  2. 若是同步器等於0,就 CAS 設置 state 爲 1,表示同步器被佔用了,而且設置同步器的持有線程爲當前線程(爲了判斷重入)。最後返回拿鎖成功 true。
  3. 若是不是0,而且當前線程就是同步器的持有線程,說明是重入。那麼就將 state 加1,最後返回 true。因此說,當你重入一次,就須要解鎖一次,不然下個線程永遠拿不到鎖。
  4. 若是都不是,返回 false ,表示拿鎖失敗。

從這裏,咱們能夠看到, statei 字段很是的重要,判斷鎖是否被持有徹底根據這個字段來的。這點必定要注意,而這個設計和操做系統的 pv 由殊途同歸之妙。

那麼看完了拿鎖,再看看解鎖,咱們能夠先猜測一下如何設計,首先確定是要將 state 字段設置爲 0,才能讓下個線程拿鎖,而後呢?喚醒等待隊列中的下個線程。讓他嘗試拿鎖。那到底 doug lea 是否是這麼設計的呢?咱們來看看。

3. 重入鎖的 unlock 方法

該方法調用了AQS 的 release 方法:

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

複製代碼

首先嚐試釋放,若是成功,則喚醒下一個線程。

咱們先看看 tryRelease 方法 (須要重寫):

protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

複製代碼

該方法步驟以下:

  1. 計算同步器狀態減去1後的值。
  2. 判斷同步器線程和當前線程是否相同,若是不一樣,拋出監視器狀態異常。
  3. 判斷狀態是不是 0,也就是說,若是是0,表示沒有線程持有鎖了,那麼就是設置 free 爲 true,而且設置同步器的 thread 屬性爲null,
  4. 最後設置 state 爲 計算的值,這裏須要考慮重入。最後返回。

能夠看到,若是 state 不是 0 的話,就會返回 false ,後面的步驟就沒有了,也就是說,重入鎖解鎖的時候不會喚醒下一個線程。

若是解鎖成功,執行下面的步驟,若是 head 頭節點不是 null 而且他的狀態不是0,說明有線程能夠喚醒,執行 unparkSuccessor 方法。

private void unparkSuccessor(Node node) {

        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);


        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

複製代碼

該方法步驟以下:

  1. 獲取到頭節點的狀態。
  2. 若是小於0,CAS 設置狀態爲0。
  3. 獲取到頭節點的next 節點,判斷是否爲null,或者 next 節點是否大於0,若是是null 或者大於0,則從末端節點開始向上查找,直到找到狀態小於等於0 的節點。
  4. 最後喚醒該節點的線程。

這個時候,等待在 acquireQueued 方法中,準確的說是 parkAndCheckInterrupt 方法中的 線程被喚醒,開始繼續循環,嘗試拿鎖(須要修改 state 變量),並設置本身爲 head。

這裏還有一個漏掉的地方,就是 waitStatus 變量,何時會大於等於0? 該變量默認是 0,大於 0 的狀態是被取消的狀態。何時會被取消呢? 在acquireQueued 方法中,若是方法沒有正常結束,則會執行 finally 中的 cancelAcquire 方法,該方法會將狀態變成 1,也就是取消狀態。

4 總結

此次咱們分析 AQS,也就是鎖的的真正實現,只分析了 lock 方法和 unlock 方法,這兩個方法是重入鎖的基礎。CountDown 和 Semaphore 是共享鎖,可是基本原理相同,只是將 state 的數字加大即可以實現。而和重入鎖等鎖相關聯的 Condition 則是經過 LockSupport 工具類直接掛起當前線程,並將當前線程添加到等待隊列中,當調用 Condition 的 signal 方法時,則喚醒隊列中的第一個線程。具體源碼咱們有機會再分析。

總之,java 重入鎖的實現基於 AQS,而 AQS 主要基於 state 變量和隊列來實現。實現原理和 pv原語 相似。

good luck!!!!!

相關文章
相關標籤/搜索