AQS分析筆記

1 介紹

AQS: AbstractQueuedSynchronizer,即隊列同步器。是構建鎖或者其餘同步組件的基礎框架。它維護了一個volatile int state(表明共享資源)和一個FIFO線程等待隊列(多線程爭用資源被阻塞時會進入此隊列)。html

state的訪問方式有:java

  • getState()
  • setState()
  • compareAndSetState()

自定義同步器須要根據須要重寫如下方法node

  • isHeldExclusively():該線程是否正在獨佔資源。只有用到condition才須要去實現它。
  • tryAcquire(int):獨佔方式。嘗試獲取資源,成功則返回true,失敗則返回false。
  • tryRelease(int):獨佔方式。嘗試釋放資源,成功則返回true,失敗則返回false。
  • tryAcquireShared(int):共享方式。嘗試獲取資源。負數表示失敗;0表示成功,但沒有剩餘
    可用資源;正數表示成功,且有剩餘資源。
  • tryReleaseShared(int):共享方式。嘗試釋放資源,若是釋放後容許喚醒後續等待結點返回true,不然返回false

而後能夠調用 acquire、release、releaseShared等方法來實現功能。c#

AQS定義兩種資源共享方式:Exclusive和Sharesegmentfault

2 Exclusive獨佔鎖

經過ReentrantLock來分析獨佔鎖。多線程

2.1 lock

ReentrantLock的基本使用形式是:框架

ReentrantLock lock = new ReentrantLock();
	try {
            lock.lock(); // 加鎖
        } catch (Exception e) {

        } finally {
            lock.unlock(); // 解鎖
        }

ReentrantLock內部類Sync繼承了AQS,ReentrantLock#lock即調用了Sync#lock。Sync又有兩個子類分別是NonfairSync和FairSync,分別實現了非公平鎖和公平鎖。oop

看下NonfairSync的lockui

static final class NonfairSync extends Sync {
       // ...
        final void lock() {
            if (compareAndSetState(0, 1)) // lock的時候直接使用cas去搶佔state,成功就返回了,表示搶鎖成功
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1); // 搶佔state狀態失敗才調用AQS的acquire方法
        }
		// ...
    }

再看下FairSync的lock線程

static final class FairSync extends Sync {
       // ...
        final void lock() {
            acquire(1); // 直接調用AQS的acquire
        }
		// ... 
    }

2.2 acquire

acquire是lock調用的關鍵。自定義鎖須要經過acquire來設置state和將節點加入FIFO等待隊列操做。

public final void acquire(int arg) {
        if (!tryAcquire(arg) && // 用戶自定義內容,返回true表示獲取鎖成功,不然加入等待隊列
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 建立節點,加入等待隊列
            selfInterrupt(); // if上述true,則中斷線程
    }

上面代碼能夠分開來看就會簡單點。

  • 首先調用用戶自定義的tryAcquire
  • 若是獲取鎖失敗則addWaiter,意思是把當前線程加入等待隊列,該方法返回建立的Node
  • acquireQueued會將節點對應的線程,即當前線程park。

2.2.1 tryAcquire

首先看下NonfairSync的tryAcquire

static final class NonfairSync extends Sync {
        // ...
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
}

final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) { // state==0表示能夠獲取鎖
                if (compareAndSetState(0, acquires)) { // cas設置鎖,成功則加鎖成功
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) { // 若是不是0,可是仍是當前線程,則可重入
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false; // 不然加鎖失敗返回fasle,就要進行加入等待隊列的處理
}

再看下FairSync的tryAcquire

static final class FairSync extends Sync {
       // ..
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() && // state==0而且等待隊列沒有其餘線程纔會加鎖
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

總結來說,tryAcquire是用戶自定義的,根據state設置鎖狀態,根據返回值來決定是否加入等待隊列。公平鎖和非公平鎖的差別主要在:新來的鎖會不會插隊。

2.2.2 addWaiter

addWaiter用於初始化隊列並增長新的node節點到等待隊列中

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        Node pred = tail;
		// 若是tail不是null則在tail後加入該節點;設想添加第一個節點的時候,tail爲null,則走不到這裏,則會調用下面的enq(node)初始化後再加入節點
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) { // cas添加失敗則調用enq(node)死循環添加
                pred.next = node;
                return node;
            }
        }
        enq(node); // 初始化和死循環添加
        return node;
}

private Node enq(final Node node) {
        for (;;) { // 直到添加成功爲止
            Node t = tail;
            if (t == null) { // Must initialize // 若是tail是null,則先初始化
                if (compareAndSetHead(new Node())) // 用一個空的node做爲head
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

注意:等待隊列中的頭結點是初始化的空節點或者已經獲取到鎖的節點,不是正在等待獲取鎖的節點,即第一個節點是dummy node。

2.2.3 acquireQueued

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) { // 先嚐試獲取鎖,若是前節點是head而且獲取到鎖,則當前節點成爲head,這也和2.2.2的"注意"相呼應。
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
				// 沒有獲取到鎖,判斷是否能夠park線程,符合條件則當前線程被park
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

看下線程被park的條件,即shouldParkAfterFailedAcquire。在這以前咱們須要簡單瞭解下Node的waitStatus字段
waitStatus一共四個狀態

  • CANCELLED = 1;
  • SIGNAL = -1;
  • CONDITION = -2;
  • PROPAGATE = -3;

這裏咱們關心兩個狀態:

  1. CANCELLED,表示當前節點的線程被取消了,也是惟一的正數值。
  2. SIGNAL,表示後續節點須要被喚醒
  3. 另外waitStatus初始化值爲0
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL) // 前節點狀態爲SIGNAL才能夠阻塞, 由於初始化值爲0,因此第一次是不會直接返回true
            return true;
        if (ws > 0) { // 前節點取消了,則一直往前遍歷,直到找到waitStatus不大於0的
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // 設置節點爲SIGNAL
        }
        return false;
}

這裏注意到因爲waitStatus初始值爲0,shouldParkAfterFailedAcquire第一次判斷的時候是返回fasle的,即線程不是立刻被park,在第二次的時候纔會被park。從中也能夠看到線程先自旋2次,最後再park:第一次是先嚐試獲取鎖的地方,即if (p == head && tryAcquire(arg))的位置,第二次是由於shouldParkAfterFailedAcquire返回false,因此須要再運行一次。

2.2.4 總結acquire

到如今爲止,咱們能夠看到,沒有獲取到鎖的線程是以節點的形式加入到了等待隊列,而且park了,不佔用cpu時間。

2.3 unlock

unlock調用了AQS的release方法

public void unlock() {
        sync.release(1);
}

public final boolean release(int arg) {
        if (tryRelease(arg)) { // 用戶自定義
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h); // 喚醒head節點的後續節點
            return true;
        }
        return false;
}

2.3.1 tryRelease

tryRelease由用戶自定義,被AQS中release調用,來看下ReentrantLock中tryRelease的調用

protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c); // 這裏不須要使用cas,由於是獨佔鎖,釋放鎖的時候,確定只有一個線程訪問
            return free;
}

tryRelease簡單來講就是設置state,可是注意由於是獨佔鎖,因此並不須要使用cas來設置state。

2.3.2 unparkSuccessor

private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next; // 喚醒後續節點
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

2.4 獨佔鎖總結

到這裏能夠看到,AQS維護了一個正在獲取鎖的線程的等待隊列,獲取到鎖的線程是head節點,當釋放鎖的時候會喚醒其後續節點,經過這樣的過程達到了獨佔鎖的效果。而且使用了cas和自旋減小了資源的損耗。

3 Share共享鎖

3.1 區別

  • 當獨佔鎖已經被某個線程持有時,其餘線程只能等待它被釋放後,才能去爭鎖,而且同一時刻只有一個線程能爭鎖成功
  • 對於共享鎖而言,因爲鎖是能夠被共享的,所以它能夠被多個線程同時持有。換句話說,若是一個線程成功獲取了共享鎖,那麼其餘等待在這個共享鎖上的線程就也能夠嘗試去獲取鎖,而且極有可能獲取成功

其實從代碼上來總結,最大的區別就是,共享鎖在被喚醒後不但會像獨佔鎖那樣將本身的節點設置爲head,並且會繼續喚醒它的後續節點,後續節點又會喚醒後續節點的節點。這樣當一個共享鎖獲取到鎖後,全部等待的線程都將獲取到鎖。

3.2 CountDownLatch

能夠經過分析CountDownLatch來分析下共享鎖。CountDownLatch的使用形式能夠當作是獲取鎖和釋放鎖的過程,這樣就更容易理解共享鎖了。

CountDownLatch countDownLatch = new CountDownLatch(1);

countDownLatch.await(); // 獲取鎖,能夠是多個線程都在調用

countDownLatch.countDown(); // 釋放鎖, 當釋放後全部獲取共享鎖的線程都會獲取到鎖

3.2.1 countDownLatch.await()

countDownLatch.await()能夠看作是獲取鎖。

public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1); // 調用AQS的方法
}

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0) // 自定義tryAcquireShared
            doAcquireSharedInterruptibly(arg); // 和獨佔鎖的tryAcquire基本思想是一致的,若是獲取鎖失敗就加入到等待隊列中
}

看下countDownLatch自定義的tryAcquireShared,仍是比較簡單的

protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1; // state爲0則獲取鎖成功,不然則是獲取鎖失敗
}

而後看下doAcquireSharedInterruptibly

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED); // addWaiter和獨佔鎖調用的同一個方法,只是節點類型爲SHARED
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r); // 和獨佔鎖的主要區別所在,獨佔鎖只是setHead,而共享鎖會setHeadAndPropagate,即設置head而且會傳播,將後續的共享鎖也喚醒
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }

				// 主要思想和獨佔鎖是一致的
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

看下setHeadAndPropagate

private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node); // 設置當前節點爲head
        
		// 若是propagate>0纔會喚醒後續shared節點,這裏propagate爲用戶自定義的tryAcquireShared的返回值
        if (propagate > 0 || h == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared(); // 喚醒head節點的後續節點, releaseShared一樣調用了該方法
        }
    }

這裏主要注意propagate值,即tryAcquireShared的返回值。若是tryAcquireShare<則表示沒有獲取到鎖;若是tryAcquireShare==0則表示獲取鎖成功,可是不會喚醒後續shared節點,這點從上述代碼中能夠看到;若是tryAcquireShare>0,則表示獲取鎖成功且喚醒後續share節點。

3.2.2 countDownLatch.countDown()

countDownLatch.countDown能夠當作是釋放鎖的過程,只不過若是count值不爲1的話,須要釋放屢次纔算釋放成功。

public void countDown() {
        sync.releaseShared(1); // 調用了AQS的releaseShared
}

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared(); // 和setHeadAndPropagate調用的是同一個方法
            return true;
        }
        return false;
    }
private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h); // 和獨佔鎖同樣,喚醒head後續節點
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

4 總結

AQS的整體思路是將等待的線程封裝成Node節點放在等待隊列上。獲取鎖的節點爲head節點,釋放鎖的時候,head節點會喚醒後續節點關聯的線程

須要區別的是:獨佔鎖只會喚醒後續節點的線程;而共享鎖後續節點被喚醒後會接着繼續喚醒他本身的後續節點,一直到把全部連續的共享節點都喚醒。

5 參考

  1. http://www.javashuo.com/article/p-xcevmtwv-gz.html
  2. http://www.javashuo.com/article/p-sihcqnms-q.html
相關文章
相關標籤/搜索