Java併發基礎框架AbstractQueuedSynchronizer初探(ReentrantLock的實現分析)

AbstractQueuedSynchronizer是實現Java併發類庫的一個基礎框架,Java中的各類鎖(RenentrantLock, ReentrantReadWriteLock)以及同步工具類(Semaphore, CountDownLatch)等不少都是基於AbstractQueuedSynchronizer實現的。AbstractQueuedSynchronizer 通常簡稱AQS,Abstract表示他是一個抽象類,Queued表示他是基於先進先出 FIFO 等待隊列實現的,Synchronizer表示他是一個同步器。html

基於隊列的意思是,咱們用鎖來講明,好比多個線程想要得到同一個對象上的鎖,那麼這些線程會按照申請鎖的前後順序在該鎖對象中的一個FIFO隊列上排隊等待(也就是將這些線程對象的引用插入到該鎖的隊列中)。AQS是Java併發的基礎框架,同時AOS的實現的基礎倒是 sun.misc.Unsafe 和 volatile,固然還有LockSupport工具類,LockSupport也是藉助於Unsafe,主要實現線程的「阻塞」(park)和線程的「喚醒阻塞」(unpark)。基本原理是 sun.misc.Unsafe 保證了內存操做的「原子性」,而volatile保證了內存「可見性」。Unsafe的源碼能夠參見:http://www.docjar.com/html/api/sun/misc/Unsafe.java.html ,它提供了各類原子性的內存CAS操做。java

本文從ReentrantLock的實現來初步探索AbstractQueuedSynchronizer。爲了好把握方向,咱們將ReentrantLock的源碼(Java1.8.0_40)簡化以下:node

public class ReentrantLock implements Lock, java.io.Serializable {
    private static final long serialVersionUID = 7373984872572414699L;
    /** Synchronizer providing all implementation mechanics */
    private final Sync sync;

    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;

        abstract void lock();

        final boolean nonfairTryAcquire(int acquires) {
            // ... ...
        }

        protected final boolean tryRelease(int releases) {
           // ... ...
        }
        // ... ...
    }

    /**
     * Sync object for non-fair locks
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

    /**
     * Sync object for fair locks
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
            acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
           // ... ...
        }
    }

    public ReentrantLock() {
        sync = new NonfairSync();
    }

    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
    public void lock() {
        sync.lock();
    }
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }
    public boolean tryLock() {
        return sync.nonfairTryAcquire(1);
    }
    public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
    public void unlock() {
        sync.release(1);
    }
    public Condition newCondition() {
        return sync.newCondition();
    }
// ... ... }

能夠明顯的看到,ReentrantLock 實現的因此接口都是藉助於他的實例屬性——同步器sync來實現的,從構造函數能夠看出,ReentrantLock默認是非公平鎖——使用非公平同步器NonfairSync,傳入true時獲得的是公平鎖——使用公平同步器FairSync。而這二者都是繼承於抽象類Sync,而抽象類Sync又繼承於咱們的AbstractQueuedSynchronizer。咱們先總體看下AQS的實現代碼:api

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
    static final class Node {   
        volatile int waitStatus;
        volatile Node prev;
        volatile Node next;
        volatile Thread thread;
        Node nextWaiter;
        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }
    /**
     * Head of the wait queue, lazily initialized. 
     */
    private transient volatile Node head;
    /**
     * Tail of the wait queue, lazily initialized.  
     */
    private transient volatile Node tail;
    /**
     * The synchronization state.
     */
    private volatile int state;
    /**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node's predecessor
     */
    private Node enq(final Node node) {
       // ... ...
    }

    /**
     * Creates and enqueues node for current thread and given mode.
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {
        // ... ...
    }

    /**
     * Sets head of queue to be node, thus dequeuing. Called only by
     * acquire methods.  Also nulls out unused fields for sake of GC
     * and to suppress unnecessary signals and traversals.
     * @param node the node
     */
    private void setHead(Node node) {
        // ... ...
    }
public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; /** First node of condition queue. */ private transient Node firstWaiter; /** Last node of condition queue. */ private transient Node lastWaiter; // ... ... } }

AbstractQueuedSynchronizer的實現包含了兩個內部類,Node 類和 ConditionObject類,然後者只有在使用 ReentrantLock.newCondition()時纔會用到,暫時不去管它。Node類主要做爲FIFO隊列上的節點,存儲在鎖上等待的全部線程對象的信息。提供了enq(final Node node)方法用於插入隊列尾部,addWaiter(Node mode)方法用於加入FIFO隊列,setHead(Node node)用於初始化FIFO隊列的頭部。因此AbstractQueuedSynchronizer沒有咱們想象的那麼複雜,它主要是用於實現一個FIFO的等待隊列(咱們暫時放下ConditionObject無論),以及管理同步器的狀態status併發

咱們在看一下他繼承的父類:app

public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {
    protected AbstractOwnableSynchronizer() { }
    /**
     * The current owner of exclusive mode synchronization.
     */
    private transient Thread exclusiveOwnerThread;

    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }

    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}

很簡單,就是實現了互斥同步器的全部者的功能,好比互斥鎖正被哪一個線程佔有者。框架

咱們大致瞭解了AbstractQueuedSynchronizer以後,咱們再從細節上仔細分析ReentrantLock的實現。less

1)ReentrantLock.lock實現分析函數

ReentrantLock分爲公平和非公平的鎖,NonfairSync 和 FairSync的lock實現分別以下:工具

    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

    static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
            acquire(1);
        }

        /**
         * Fair version of tryAcquire.  Don't grant access unless
         * recursive call or no waiters or is first.
         */
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

 

NonfairSync.lock 和 FairSync.lock實現差異只有兩行代碼:

if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());

就是這兩行代碼使得了 NonfairSync.lock 的鎖的實現是非公平的,這兩行代碼的意思是:若是sync同步器的狀態爲0,也就是鎖沒有被佔,那麼就設置爲1,也就是馬上得到鎖,而且設置鎖的擁有者。也就是說非公平鎖,能夠 不進入等待隊列而直接獲取鎖,而且不論是否在他的前面已經有其它線程在等待着獲取該鎖,這就是「不公平」鎖的緣由之一。緣由之二是它們的調用 acquire(1); 都是在 AQS 中,都分別調用了子類中的tryAcquire,而NonfairSync.tryAcquire 和 FairSync.tryAcquire實現又不一樣:

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

這裏含義是:tryAcquire(arg)嘗試去得到鎖,而且調用 acquireQueued(addWaiter(Node.EXCLUSIVE), arg),將該申請鎖的線程插入FIFO等待隊列。而NonfairSync.tryAcquire的實現以下:

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
        /**
         * Performs non-fair tryLock.  tryAcquire is implemented in
         * subclasses, but both need nonfair try for trylock method.
         */
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

而FairSync.tryAcquire的實現以下:

        /**
         * Fair version of tryAcquire.  Don't grant access unless
         * recursive call or no waiters or is first.
         */
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

能夠明顯看到公平鎖的實現:

            if (c == 0) {
                if (!hasQueuedPredecessors()

即便 c==0 ,也就是鎖沒有被佔有,它也要調用hasQueuedPredecessors()去判斷是否在本身前面已經有線程在等待隊列上了,因此這裏就是實現了FIFO的公平,先到的先得到鎖。因此公平鎖和非公平鎖的實如今上面的兩個對方是有區別的。

分析完了鎖的公平和非公平的緣由,咱們再接着上面看如何實現加入FIFO隊列,以及如何實現等待:

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

tryAcquire(arg)剛纔分析完了,咱們再看addWaiter(Node.EXCLUSIVE)

    /**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
    /**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node's predecessor
     */
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

很簡單,就是構造一個Node節點,而後插入到等待隊列的尾部。

再看acquireQueued(addWaiter(Node.EXCLUSIVE), arg):

    /**
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     *
     * @param node the node
     * @param arg the acquire argument
     * @return {@code true} if interrupted while waiting
     */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

這裏就實現了在鎖上的「阻塞」的功能。在一個死循環中,先判斷Node是不是等待隊列的頭節點,若是是的話,而後調用tryAcquire(arg)去得到鎖,而後就能夠返回了,也就是得到鎖成功了。若是Node不是頭節點的話,線程就要被阻塞了:

    /**
     * Checks and updates status for a node that failed to acquire.
     * Returns true if thread should block. This is the main signal
     * control in all acquire loops.  Requires that pred == node.prev.
     *
     * @param pred node's predecessor holding status
     * @param node the node
     * @return {@code true} if thread should block
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

該函數的功能是將Node的前驅節點的等待狀態pred.waitStatus設置爲SIGNAL。這樣設置的緣由是方便實現Node節點的「喚醒阻塞」(unpark)。設置成功以後調用:parkAndCheckInterrupt(); 開始被「阻塞」:

    /**
     * Convenience method to park and then check if interrupted
     *
     * @return {@code true} if interrupted
     */
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

阻塞的實現利用了LockSupport類,而LockSupport類又使用了Unsafe:

    public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        UNSAFE.park(false, 0L);
        setBlocker(t, null);
    }

setBlocker(t, blocker) 設置了當前線程被誰阻塞了。UNSAFE.park(false, 0L);實現阻塞:

 /**
    * Block current thread, returning when a balancing
    * <tt>unpark</tt> occurs, or a balancing <tt>unpark</tt> has
    * already occurred, or the thread is interrupted, or, if not
    * absolute and time is not zero, the given time nanoseconds have
    * elapsed, or if absolute, the given deadline in milliseconds
    * since Epoch has passed, or spuriously (i.e., returning for no
    * "reason"). Note: This operation is in the Unsafe class only
    * because <tt>unpark</tt> is, so it would be strange to place it
    * elsewhere.
    */
 public native void park(boolean isAbsolute, long time);

park方法能夠被 unpark 喚醒,超時也會被喚醒,中斷也會被喚醒。

park方法被喚醒了以後,就會在上面那個死循環中,再次檢查本身是不是 頭結點:

            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                    interrupted = true;
            }

若是是頭結點的話, 那麼從新調用tryAcquire(arg)去得到鎖,而後返回,表示得到鎖成功了。到這裏 ReentrantLock.lock()方法的實現算是分析完了。

2)ReentrantLock.unlock實現分析

    /**
     * Attempts to release this lock.
     *
     * <p>If the current thread is the holder of this lock then the hold
     * count is decremented.  If the hold count is now zero then the lock
     * is released.  If the current thread is not the holder of this
     * lock then {@link IllegalMonitorStateException} is thrown.
     *
     * @throws IllegalMonitorStateException if the current thread does not
     *         hold this lock
     */
    public void unlock() {
        sync.release(1);
    }
    /**
     * Releases in exclusive mode.  Implemented by unblocking one or
     * more threads if {@link #tryRelease} returns true.
     * This method can be used to implement method {@link Lock#unlock}.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryRelease} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @return the value returned from {@link #tryRelease}
     */
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

再看 tryRelease:

        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

很簡單,就是修改 sync 的屬性status。若是stauts等於0了,就表示鎖已經被釋放了。因而就能夠喚醒FIFO隊列的頭節點了,unparkSuccessor(head):

    /**
     * Wakes up node's successor, if one exists.
     *
     * @param node the node
     */
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

這裏 t.waitStatus <= 0 小於0的包括了 咱們在調用shouldParkAfterFailedAcquire時 設置waitStatus 爲SIGNAL,由於SIGNAL==-1,因此這裏的LockSupport.unpark(s.thread)恰好喚醒了前面的 park().

因此lock() 和 unlock()方法也對應起來了。到這裏ReentrantLock的lock和unlock方法分析完成。ReentrantLock的實現藉助於AQS,而AQS有藉助於LockSupport和Unsafe,以及volatile。ReentrantLock使用state表示鎖被同一個線程獲取了多少次,而且記錄了鎖的擁有者(線程)。可重入鎖的可重入的緣由就是由於記錄了鎖的擁有者和記錄鎖被獲取的次數來實現的。另外鎖的公平性的實現就是是否容許鎖申請的插隊。

Semaphore, CountDownLatch的實現相比ReentrantLock而言更加簡單,實現方式也是大致類似的。

其實查看一些JDK關於併發的庫,就能夠知道:Java併發庫的構建的基礎基本就兩個——Unsafe和volatile,前者保證「原子性」,後者保證「可見性」

相關文章
相關標籤/搜索