Java Concurrency(三)——J.U.C AQS源碼解讀

java5以後的java.util.concurrent包是世界級併發大師Doug Lea的做品,裏面主要實現了java

  1. atomic包裏Integer/Long對應的原子類,主要基於CAS
  2. 一些同步子,包括Lock,CountDownLatch,Semaphore,FutureTask等,這些都是基於AbstractQueuedSynchronizer類;
  3. 關於線程執行的Executors類等;
  4. 一些併發的集合類,好比ConcurrentHashMap,ConcurrentLinkedQueue,CopyOnWriteArrayList等。

今天咱們主要介紹AbstractQueuedSynchronizer這個能夠說是最核心的類,沒有之一。整個concurrent包裏,基本都直接或間接地用到了這個類。Doug Lea的這篇論文裏面講AQS的實現。node

#AQS安全

首先,咱們來想象一下,一間屋裏有一個你們都想要獲得的會讓你很爽的東西(something which makes you so happy, e.g. W.C)。當有人進去把門關起來在獨佔享用的時候,其餘人就只能在外面排隊等待,既然在等待,你就不能總是去敲門說哎,好了沒有啊。總是這樣的話裏面的人就很不爽了,並且你能夠利用這點等待時間乾點別的,好比看看小說視頻背背單詞或者就乾脆椅子上睡覺,當前面獨佔的人爽完以後,就會出來講,啊,好爽,到大家了。而後你們可能按照排隊順序獲取或者你們瘋搶這個狀態,有可能一我的本身進去獨佔,有可能幾我的說,哎不要緊,咱們能夠一塊兒來。而後他們進去爽,爽完以後再出來通知下一個。併發

咱們來把上面這段話翻譯成AQS裏面的術語。有一個狀態state,會有多個Thread嘗試獲取,當一個Thread獨佔(EXCLUSIVE,好比Lock)以後,其餘後面到來的Thread就會被放到一個Queue的隊尾(tail),而後睡眠(park),一直等到前面的Thread喚醒(unpark)它,固然這裏有可能被假喚醒(就比如你定了鬧鐘8點起牀,結果7點就天然醒或者被外面車吵醒),因此這個Thread會判斷一下是否是到本身了,沒有的話就繼續park(在一個死循環裏);當擁有state的Thread釋放(release)以後,它會喚醒Queue中的下一個Thread(unparkSuccessor)。而後下一個Thread獲取(acquire)到state,完成本身的任務,而後繼續unparkSuccessor。前面主要說的是EXCLUSIVE模式,AQS還支持共享(SHARED)模式,區別在於嘗試獲取(tryAcquireShared)的時候即便以前已經有Thread獲取了state,可是可能仍然能獲取(好比ReadLock)。一樣釋放(doReleaseShared)的時候除了通知Queue裏面第一個(head),還會繼續通知後續的節點(Node),只要它們是SHARED。app

AQS就是實現了:ide

  1. 自動管理這個同步狀態state(int類型),更新的時候須要用CAS保證原子性
  2. 阻塞和喚醒線程park/unpark
  3. 隊列管理,一個雙向鏈表實現queue

AQS是一個abstract class,能夠經過繼承AQS,定義state的含義,以及tryAcquire,tryRelease,以及對應的share模式下tryAcquireShared,tryReleaseShared這幾個方法,定義出本身想要的同步子(Synchronizers)。通常而言,是定義一個內部類Sync extends AQS,實現前面說的幾個方法,而後再包一層,暴露出相應的方法。這樣作的好處是你能夠在包裝器類裏面取更直觀的名字,如ReentrantLock裏的lock,unlock和CountDownLatch裏的countDown,await,而不是太通用的acquire和release等。並且AQS裏面一些方法是爲了監控和調試使用,直接暴露出來也很差。函數

下面咱們來看J.U.C裏面兩個經常使用的Synchronizers。oop

#ReentrantLock測試

##使用ui

ReentrantLock的語義跟synchronized關鍵字基本同樣,並且我以前看《深刻理解Java虛擬機》裏面的評測說JDK6以後,二者的效率基本一致了(JDK5以前ReentrantLock要比synchronized快不少)。Javadoc裏面說基本用法以下:

class X {
  private final ReentrantLock lock = new ReentrantLock();
  // ...
  public void m() {
    lock.lock();  // block until condition holds
    try {
      // ... method body
    } finally {
      lock.unlock()
    }
  }
}

##源碼

ReentrantLock用state表示是否被鎖,0表示沒有線程獲取到鎖,>=1表示某個線程獲取了N次鎖(由於是重入的,只要保證lock和unlock成對出現就沒有問題)。

/** Synchronizer providing all implementation mechanics */
    private final Sync sync;

    abstract static class Sync extends AbstractQueuedSynchronizer {

定義了一個內部類,基本任務都代理給sync完成。而Sync又是一個abstract class,這裏主要是由於實現了兩種搶佔鎖的機制,公平鎖和非公平鎖。

static final class FairSync extends Sync
	
	static final class NonfairSync extends Sync

所謂公平不公平簡單來講就是本文開頭說的,當資源釋放的時候,你們是按照排隊順序先到先得,仍是有人插隊你們瘋搶。

提供了兩個構造函數:

public ReentrantLock() {
        sync = new NonfairSync();//默認非公平鎖,AQS論文說非公平鎖效率高些,理由其實很簡單,公平鎖通知隊列第一個節點,要把它喚醒,而喚醒是須要時間的,在鎖釋放到第一個節點被喚醒這段時間其實鎖是能夠用可是沒有被用的(available but not used);而非公平鎖,釋放了以後立馬就能夠被別人用,因此提升了效率,可是有可能致使飢餓鎖,這個就要具體看業務需求了。
    }

	public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();//指定公平與否
    }

加鎖的實現

public void lock() {
        sync.lock();
    }

簡單代理給了sync,在FairSync裏爲

final void lock() {
		acquire(1);
	}

acquire的實如今AQS裏面:

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

tryAcquire是要在子類裏本身實現的,在FairSync以下;

protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {//0表示鎖沒有被線程用,1表示已經有線程佔用
                if (!hasQueuedPredecessors() && //判斷本身是不是第一個節點,實現公平
                    compareAndSetState(0, acquires)) {//CAS更新狀態
                    setExclusiveOwnerThread(current);//設置當前線程擁有狀態
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {//1表示已經有線程佔用,再判斷一下是否被當前線程佔用,來實現重入(Reentrant)特性
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);//更新狀態
                return true;
            }
            return false;
        }

若是獲取失敗,addWaiter(Node.EXCLUSIVE)將當前線程加入隊尾

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);//用當前線程構造Node,獨佔模式
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {//快速判斷,CAS更新tail節點
                pred.next = node;
                return node;
            }
        }
        enq(node);//若是失敗,進入enq方法
        return node;
    }
    
	 private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))//若是尚未head,CAS初始化一個head
                    tail = head;
            } else {//這段代碼跟addWaiter裏同樣,CAS更新tail節點
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

如今咱們已經將獲取不到鎖的線程加入隊尾了,如今要將它掛起acquireQueued(addWaiter(Node.EXCLUSIVE), arg)):

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {//在一個死循環中,避免假喚醒
                final Node p = node.predecessor();//獲取當前節點的前一個節點,若是是head說明本身是第一個能夠獲取資源的線程,實現公平
                if (p == head && tryAcquire(arg)) {//是第一個能夠獲取資源的線程而且嘗試獲取成功
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())//沒有獲取到資源,睡眠park去
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

	private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

上面完成了獲取鎖的過程,簡單來講就是嘗試獲取,失敗就加入隊尾,掛起,等待被喚醒。

下面來看看釋放鎖

public void unlock() {
        sync.release(1);//代理給sync,調用AQS的release
    }

//下面代碼在AQS中
	public final boolean release(int arg) {
        if (tryRelease(arg)) {//嘗試釋放資源,須要在子類裏實現
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);//通知下一個節點
            return true;
        }
        return false;
    }

	private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
         // 主要在這裏找到下一個須要通知的節點,若是node.next就是須要通知的節點,則直接通知;不然,可能node.next == null(緣由是雙向鏈表設置b.pre = a和a.next = b的時候不能保證原子性,只能保證b.pre = a成功,這時候另外一條線程可能看到a.next == null)或者s.waitStatus > 0(緣由是線程等不及被取消了static final int CANCELLED = 1;),這個時候就要從隊尾tail開始找,找到離隊頭head最近的一個須要通知的節點Node。
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);//喚醒線程
    }

看看須要在子類裏實現的tryRelease:

protected final boolean tryRelease(int releases) {
            int c = getState() - releases;//釋放鎖,state減去相應的值
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();//避免A線程鎖了以後,B線程故意搗亂釋放鎖
            boolean free = false;
            if (c == 0) {//當前線程已經徹底釋放了鎖
                free = true;
                setExclusiveOwnerThread(null);//釋放鎖的擁有者
            }
            setState(c);//設置狀態,這個方面沒有同步,沒有CAS,有同窗問過豈不是有線程併發問題?其實到這裏,只有一個線程會調用這個方法,因此不會有併發錯誤,仔細想一想,是吧?是吧?
            return free;
        }

到這裏,基本都已經完成,對了,尚未說非公平鎖NonfairSync是怎麼搶佔鎖的。

final void lock() {
            if (compareAndSetState(0, 1))//先搶一把(插隊),萬一成功了就不排隊,不公平性就體如今這裏!
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

跟FairSync.lock()對比,能夠看出,只是在acquire(1)以前,先搶一把,搶不到才乖乖的去排隊。

咱們再看看NonfairSync.tryAcquire()怎麼實現的

protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);//調用父類方法nonfairTryAcquire
        }

		final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {//跟FairSync.tryAcquire只有這裏一行有差別,即少了!hasQueuedPredecessors(),也就是說不判斷前面有沒有人,任什麼時候候只要它醒來,都會去搶,因此不公平!============剛又看了一遍,發現其實final boolean acquireQueued(final Node node, int arg)方法裏已經有node.predecessor() == head的判斷,感受這個不公平的tryAcquire貌似沒有意義,各位看官怎麼看呢,請留言哈,謝謝~
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

#CountDownLatch

咱們以前說了,AQS支持獨佔EXCLUSIVE和共享SHARED兩種模式,而剛剛的ReentrantLock的就是獨佔模式,咱們來看看一個使用共享模式的類。

##使用

CountDownLatch就比如一道門,它能夠用來等全部資源都到齊了,纔開門,讓這些線程同時經過。好比以下是CountDownLatch一個通用用法:

package concurrentStudy;

import java.util.concurrent.CountDownLatch;

/**
 * Created by magicalli on 2014/12/13.
 */
public class IndexPlusPlusTest01 {
    private static final int NThreads = 10;// 線程數
    private static final int M = 100000;//循環次數,過小的話(好比10)可能看不出來效果
    private volatile static int n = 0;//加volatile的目的是爲了證實volatile沒有「原子性」!

    public static void main(String[] args) throws InterruptedException {
        final CountDownLatch startGate = new CountDownLatch(1);
        final CountDownLatch endGate = new CountDownLatch(NThreads);

        for (int i = 0; i < NThreads; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        startGate.await();//全部線程start以後等待「門「打開,保證同時真正開始運行
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                    for (int j = 0; j < M; j++) {
                        n += 1;
                    }

                    endGate.countDown();
                }
            }).start();
        }

        startGate.countDown();//打開「門」,讓全部線程同時run起來
        long t1 = System.currentTimeMillis();
        endGate.await();//等全部線程都結束以後纔打印n,不然老是會打出錯誤的n;我見過這裏用Thread.sleep(),可是問題在於,你怎麼知道該等多久才能保證全部線程結束以及恰好結束呢?!
        long t2 = System.currentTimeMillis();
        System.out.println("cost time: " + (t2 - t1));
        System.out.println("n: " + n);
    }
}

對了,上面代碼是拿來驗證volatile不具有原子性的,是錯誤的代碼哦。若是想併發安全,你們能夠想一想能夠用哪些方式實現。

##源碼

CountDownLatch一樣也是定義了一個繼承自AQS的內部類Sync:

private static final class Sync extends AbstractQueuedSynchronizer

構造函數以下:

public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

count表示有多少個任務還在運行,每一個Thread完成了任務或者準備好開始以前,就會調用countDown方法將count-1,當count==0時候,await就再也不阻塞,全部在上面阻塞的Thread均可以順利經過。

public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

直接調用AQS的acquireSharedInterruptibly方法,從方法名能夠看出,支持中斷響應

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();//響應中斷
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

tryAcquireShared在子類中實現:

protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;//若是state爲0,說明全部Thread完成任務,能夠不阻塞了
        }

若是沒有獲取到,將Thread加入隊尾,掛起。下面這個方法跟獨佔模式下acquireQueued(addWaiter(Node.EXCLUSIVE), arg))這個方法代碼是基本一致的。

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);//共享模式
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);//跟EXCLUSIVE的一大區別
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();//響應中斷,這裏直接拋異常
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

	private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        /*
         * Try to signal next queued node if:
         *   Propagation was indicated by caller,
         *     or was recorded (as h.waitStatus either before
         *     or after setHead) by a previous operation
         *     (note: this uses sign-check of waitStatus because
         *      PROPAGATE status may transition to SIGNAL.)
         * and
         *   The next node is waiting in shared mode,
         *     or we don't know, because it appears null
         *
         * The conservatism in both of these checks may cause
         * unnecessary wake-ups, but only when there are multiple
         * racing acquires/releases, so most need signals now or soon
         * anyway.
         */
         // 若是當前節點是願意共享,而且下一個節點也是願意共享的,那麼就進入doReleaseShared,喚醒下一個節點,下面會詳解
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

前面完成了等待CountDownLatch的count變成0的過程,下面看看countDown

public void countDown() {
        sync.releaseShared(1);//調用AQS的
    }

	// AQS中
	public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {//嘗試釋放,須要在子類中實現
            doReleaseShared();//真正釋放
            return true;
        }
        return false;
    }

		// Sync子類中實現
	    protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))// 在死循環中CAS將count-1
                    return nextc == 0;
            }
        }
        
    // AQS中
	private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);//遍歷queue,通知全部SHARED的節點,由於是共享模式,這些Node都應該被喚醒,直到遇到某個EXCLUSIVE的Node
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

unparkSuccessor跟以前獨佔模式裏面的是同一個函數,即調用unpark喚醒Thread。

咱們知道爲了不獲取不到鎖長時間等待,通常阻塞的方法都會支持帶超時時間的方法,好比CountDownLatch裏就有

public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

調用AQS裏面的tryAcquireSharedNanos方法

public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquireShared(arg) >= 0 ||
            doAcquireSharedNanos(arg, nanosTimeout);
    }

	private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return true;
                    }
                }
                nanosTimeout = deadline - System.nanoTime();
                if (nanosTimeout <= 0L)
                    return false;//若是已經沒時間了,直接return false
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)//大於某個閾值,才park,不然進入自旋
                    LockSupport.parkNanos(this, nanosTimeout);//調用帶超時的park方法
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

能夠看到,跟不帶超時的doAcquireSharedInterruptibly方法相比,區別主要在於每次for循環期間,檢查時間是否過時和調用帶超時的park。nanosTimeout > spinForTimeoutThreshold這個判斷主要是由於park/unpark自己也須要花時間,爲了更準確地完成超時的機制,在超時時間立刻就要到了的時候,就進入自旋,再也不park了,這應該是Doug Lea測試了park/unpark時間比1000納秒要長吧。

/**
     * The number of nanoseconds for which it is faster to spin
     * rather than to use timed park. A rough estimate suffices
     * to improve responsiveness with very short timeouts.
     */
    static final long spinForTimeoutThreshold = 1000L;

#總結

J.U.C裏AQS是一個至關核心的類,能夠說沒有它就沒有J.U.C包。推薦你們看看AQS這篇論文(網上有一些翻譯,推薦你們仍是看原文吧)。主要是用一個state表示狀態,子類能夠根據須要來定義state的含義,以及獲取釋放資源時具體如何操做state,固然須要經過CAS實現原子更改。當獲取不到state的時候,線程加入隊列,掛起。釋放以後,喚醒隊列中的線程。AQS支持兩種模式,獨佔EXCLUSIVE和共享SHARED。J.U.C裏自己也有不少直接繼承AQS實現的類,包括Lock,CountDownLatch,Semaphore,FutureTask等,若是這些還不能知足你的使用,那麼能夠直接繼承AQS來實現須要。

#Refers

  1. http://gee.cs.oswego.edu/dl/papers/aqs.pdf
  2. http://ifeve.com/introduce-abstractqueuedsynchronizer/
  3. http://ifeve.com/jdk1-8-abstractqueuedsynchronizer/
  4. http://ifeve.com/jdk1-8-abstractqueuedsynchronizer-part2/
  5. http://book.douban.com/subject/6522893/
  6. http://my.oschina.net/magicly007/blog/364102

Written with StackEdit.

相關文章
相關標籤/搜索