Java之AQS原理淺析

 AQS全稱AbstractQueuedSynchronizer,它是實現 JCU包中幾乎全部的有關鎖、多線程併發以及線程同步器等重要組件的基石, 其核心思想就是volatile int state這個屬性配合Unsafe這個工具類來實現對當前鎖的狀態進行修改 。html

一、AQS原理簡述

 AQS內部維護着一個FIFO的CLH隊列,該隊列的基本結構以下。 java

在這裏插入圖片描述
在該隊列中,每個節點表明一個線程,但都不是線程的,只有頭結點纔會進行線程安全處理。

1.一、Node節點

 AQS中使用Node來表示CLH隊列的每一個節點,源碼以下:node

static final class Node {
        //表示共享模式(共享鎖)
        static final Node SHARED = new Node();
        //表示獨佔模式(獨佔鎖)
        static final Node EXCLUSIVE = null;

        //表示線程已取消
        static final int CANCELLED =  1;
        //表示當前結點的後繼節點須要被喚醒
        static final int SIGNAL    = -1;
        //線程(處在Condition休眠狀態)在等待Condition喚醒
        static final int CONDITION = -2;
        //表示鎖的下一次獲取能夠無條件傳播,在共享模式頭結點有可能處於這種狀態
        static final int PROPAGATE = -3;

        //線程等待狀態
        volatile int waitStatus;

        //當前節點的前一個節點
        volatile Node prev;

        //當前節點的下一個節點
        volatile Node next;

        //當前節點所表明的的線程
        volatile Thread thread;

        //能夠理解爲當前是獨佔模式仍是共享模式
        Node nextWaiter;

        //若是節點在共享模式下等待,則返回true。
        final boolean isShared() {
            return nextWaiter == SHARED;
        }
        //獲取前一個節點
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }
        ...
    }
複製代碼

1.二、入隊

 若是當前線程經過CAS獲取鎖失敗,AQS會將該線程以及等待狀態等信息打包成一個Node節點,並將其加入同步隊列的尾部,同時將當前線程掛起。編程

public final void acquire(int arg) {
        //tryAcquire是子類重寫的方法
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    //將線程及狀態信息打包成一個節點
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            //快速的將節點插入隊列尾部,
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //快速插入失敗,經過輪詢來插入尾部,性能比快速插入消耗要大一些
        enq(node);
        return node;
    }
    //經過輪詢方式將節點插入隊列尾部
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    //掛起當前線程
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //經過LockSupport來掛起當前線程
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
複製代碼

 整體流程圖以下。 segmentfault

在這裏插入圖片描述

獲取鎖失敗並添加節點到同步隊列尾部的操做

1.三、出隊

 當釋放鎖時,執行出隊操做及喚醒後繼節點。安全

public final boolean release(int arg) {
        //tryRelease是子類實現的方式
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                //線程喚醒及出隊操做
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
複製代碼

 整體流程圖以下。 多線程

在這裏插入圖片描述

釋放鎖時的移除節點操做

1.四、同步狀態管理

 或許對圖中的同步器有所疑惑。它究竟是什麼?其實很簡單,它就是給首尾兩個節點加上volatile同步域,以下。併發

private transient volatile Node head;
    private transient volatile Node tail;
    private volatile int state;
複製代碼

 上面三個變量是AQS中很是重要的三個變量,前面兩個變量好理解,下面就來講一下state變量,該變量是一個計數器,在獨佔鎖狀況下,獲取鎖後,state的值就會爲1,釋放鎖時就設置爲0(這種鎖屬於不可重入鎖);在共享鎖狀況下,每個線程獲取到鎖,就會state++,釋放鎖時就state--;在可重入鎖狀況下(獲取的鎖都是同一把鎖),每獲取一次鎖會state++,釋放鎖時state--。ide

protected final int getState() {
        return state;
    }
    protected final void setState(int newState) {
        state = newState;
    }
    //使用CAS
    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
複製代碼

二、自定義獨佔鎖及共享鎖

 經過上一節的講解,想必對AQS有了必定的瞭解,下面就經過AQS來實現一個獨佔鎖及共享鎖。工具

 AQS很是強大,只須要重寫tryAcquiretryRelease這兩個方法就能夠實現一個獨佔鎖。代碼以下:

public class SingleLock implements Lock {
    //自定義的獨佔鎖
    static class Sync extends AbstractQueuedSynchronizer {
        //獨佔鎖
        @Override
        protected boolean tryAcquire(int arg) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        //獨佔鎖
        @Override
        protected boolean tryRelease(int arg) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
        //判斷是是不是獨佔鎖。
        @Override
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        Condition newCondition() {
            return new ConditionObject();
        }
    }

    private Sync sync;

    public SingleLock() {
        sync = new Sync();
    }
    //加鎖
    @Override
    public void lock() {
        sync.acquire(1);
    }
    //獲取可中斷鎖
    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }
    //獲取鎖,可能失敗
    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }
    //在time時間內不能獲取鎖則失敗
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }
    //釋放鎖
    @Override
    public void unlock() {
        sync.release(1);
    }
    //Condition來實現阻塞喚醒機制
    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
}
複製代碼

 很簡單的代碼就實現了一個獨佔鎖,SingleLock擁有ReentrantLock的大部分功能,而且用法如出一轍。是否是很簡單...

 JUC包中提供的閉鎖(CountDownLatch)信號量(Semaphore)就是典型的共享鎖的實現。共享鎖的實現也很簡單,須要重寫tryAcquireSharedtryReleaseShared這兩個方法。下面就來實現一個共享鎖。代碼以下:

public class ShareLock implements Lock {
    static class Sync extends AbstractQueuedSynchronizer {
        private int count;

        Sync(int count) {
            this.count = count;
        }

        @Override
        protected int tryAcquireShared(int arg) {
            for (; ; ) {
                int current = getState();
                int newCount = current - arg;
                if (newCount < 0 || compareAndSetState(current, newCount)) {
                    return newCount;
                }
            }
        }

        @Override
        protected boolean tryReleaseShared(int arg) {
            for (; ; ) {
                int current = getState();
                int newCount = current + arg;
                if (compareAndSetState(current, newCount)) {
                    return true;
                }
            }
        }

        Condition newCondition() {
            return new ConditionObject();
        }
    }

    private int count;
    private Sync sync;

    public ShareLock(int count) {
        this.count = count;
        sync = new Sync(count);
    }

    @Override
    public void lock() {
        sync.acquireShared(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquireShared(1) >= 0;
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.releaseShared(1);
    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
}
複製代碼

ShareLock容許count個線程同時獲取鎖,它的實現也很簡單吧。經過上面這兩個例子,咱們就能夠按照本身需求來實現不一樣的鎖,但JUC包中提供的類基本上能知足絕大部分需求了。

三、鎖的可重入性

SingleLock是咱們本身實現的一種獨佔鎖,但若是把它用在遞歸中,就會產生死鎖。由於SingleLock不具有可重入性。那麼該如何實現可重入性尼?來看ReentrantLock的實現。

final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
            if (c == 0) {
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        //可重入性的實現,若是當前線程已拿到鎖
        else if (current == getExclusiveOwnerThread()) {
            //狀態加1
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                 throw new Error("Maximum lock count exceeded");
            //從新設置狀態
            setState(nextc);
            return true;
        }
         return false;
    }

複製代碼

 能夠發現可重入性的實現仍是蠻簡單的,首先判斷當前線程是否是已經拿到鎖,若是已經拿到鎖就將state的值加1。可重入性這一點很是重要,不然會產生沒必要要的死鎖問題,Synchronize也具有可重入性。

四、鎖的公平性

SingleLock屬於一個非公平鎖,那麼如何實現公平鎖尼?其實這更簡單,只須要加個判斷便可。來看ReentrantLock的公平鎖的實現。

protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                //若是當前線程以前還有節點則hasQueuedPredecessors返回true,就不會去競爭鎖
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
複製代碼

hasQueuedPredecessors就是判斷鎖是否公平的關鍵,若是在當前線程以前還有排隊的線程就返回true,這時候當前線程就不會去競爭鎖。從而保證了鎖的公平性。

五、驚羣效應

 在使用wait/notify/notifyAll時,喚醒線程都是使用notifyAll來喚醒線程,由於notify沒法喚醒指定線程,從而可能致使死鎖。但使用notifyAll也有一個問題,那就是當大量線程來獲取鎖時,就會產生驚羣效應,大量的競爭必然形成資源的劇增和浪費,所以終究只能有一個線程競爭成功,其餘線程仍是要老老實實的回去等待。而AQS的FIFO的等待隊列給解決在鎖競爭方面的驚羣效應問題提供了一個思路:保持一個FIFO隊列,隊列每一個節點只關心其前一個節點的狀態,線程喚醒也只喚醒隊頭等待線程。

【參考資料】

深刻學習java同步器AQS

淺談Java併發編程系列(九)—— AQS結構及原理分析

Java併發編程之AQS

扒一扒ReentrantLock以及AQS實現原理

相關文章
相關標籤/搜索