深刻理解ReentrantLock

在Java中一般實現鎖有兩種方式,一種是synchronized關鍵字,另外一種是Lock。兩者其實並無什麼必然聯繫,可是各有各的特色,在使用中能夠進行取捨的使用。首先咱們先對比下二者。java

實現:#####

首先最大的不一樣:synchronized是基於JVM層面實現的,而Lock是基於JDK層面實現的。曾經反覆的找過synchronized的實現,惋惜最終無果。但Lock倒是基於JDK實現的,咱們能夠經過閱讀JDK的源碼來理解Lock的實現。node

使用:#####

對於使用者的直觀體驗上Lock是比較複雜的,須要lock和realse,若是忘記釋放鎖就會產生死鎖的問題,因此,一般須要在finally中進行鎖的釋放。可是synchronized的使用十分簡單,只須要對本身的方法或者關注的同步對象或類使用synchronized關鍵字便可。可是對於鎖的粒度控制比較粗,同時對於實現一些鎖的狀態的轉移比較困難。例如:安全

特色:#####
tips synchronized Lock
鎖獲取超時 不支持 支持
獲取鎖響應中斷 不支持 支持
優化:#####

在JDK1.5以後synchronized引入了偏向鎖,輕量級鎖和重量級鎖,從而大大的提升了synchronized的性能,同時對於synchronized的優化也在繼續進行。期待有一天能更簡單的使用java的鎖。併發

在之前不瞭解Lock的時候,感受Lock使用實在是太複雜,可是瞭解了它的實現以後就被深深吸引了。less

Lock的實現主要有ReentrantLock、ReadLock和WriteLock,後二者接觸的很少,因此簡單分析一下ReentrantLock的實現和運行機制。oop

ReentrantLock類在java.util.concurrent.locks包中,它的上一級的包java.util.concurrent主要是經常使用的併發控制類.性能

Paste_Image.png

下面是ReentrantLock的UML圖,從圖中能夠看出,ReentrantLock實現Lock接口,在ReentrantLock中引用了AbstractQueuedSynchronizer的子類,全部的同步操做都是依靠AbstractQueuedSynchronizer(隊列同步器)實現。學習

Paste_Image.png

研究一個類,須要從一個類的靜態域,靜態類,靜態方法和成員變量開始。優化

private static final long serialVersionUID = 7373984872572414699L;
    /** Synchronizer providing all implementation mechanics */
    private final Sync sync;

    /**
     * Base of synchronization control for this lock. Subclassed
     * into fair and nonfair versions below. Uses AQS state to
     * represent the number of holds on the lock.
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;

        /**
         * Performs {@link Lock#lock}. The main reason for subclassing
         * is to allow fast path for nonfair version.
         */
        abstract void lock();

        /**
         * Performs non-fair tryLock.  tryAcquire is
         * implemented in subclasses, but both need nonfair
         * try for trylock method.
         */
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

        protected final boolean isHeldExclusively() {
            // While we must in general read state before owner,
            // we don't need to do so to check if current thread is owner
            return getExclusiveOwnerThread() == Thread.currentThread();
        }

        final ConditionObject newCondition() {
            return new ConditionObject();
        }

        // Methods relayed from outer class

        final Thread getOwner() {
            return getState() == 0 ? null : getExclusiveOwnerThread();
        }

        final int getHoldCount() {
            return isHeldExclusively() ? getState() : 0;
        }

        final boolean isLocked() {
            return getState() != 0;
        }

        /**
         * Reconstitutes this lock instance from a stream.
         * @param s the stream
         */
        private void readObject(java.io.ObjectInputStream s)
            throws java.io.IOException, ClassNotFoundException {
            s.defaultReadObject();
            setState(0); // reset to unlocked state
        }
    }

從上面的代碼能夠看出來首先ReentrantLock是可序列化的,其次是ReentrantLock裏有一個對AbstractQueuedSynchronizer的引用。ui

看完了成員變量和靜態域,咱們須要瞭解下構造方法:

/**
     * Creates an instance of {@code ReentrantLock}.
     * This is equivalent to using {@code ReentrantLock(false)}.
     */
    public ReentrantLock() {
        sync = new NonfairSync();
    }

    /**
     * Creates an instance of {@code ReentrantLock} with the
     * given fairness policy.
     *
     * @param fair {@code true} if this lock should use a fair ordering policy
     */
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

從上面代碼能夠看出,ReentrantLock支持兩種鎖模式,公平鎖和非公平鎖。默認的實現是非公平的。公平和非公平鎖的實現以下:

/**
     * Sync object for non-fair locks
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

    /**
     * Sync object for fair locks
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
            acquire(1);
        }

        /**
         * Fair version of tryAcquire.  Don't grant access unless
         * recursive call or no waiters or is first.
         */
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

AbstractQueuedSynchronizer 是一個抽象類,因此在使用這個同步器的時候,須要經過本身實現預期的邏輯,Sync、FairSync和NonfairSync都是ReentrantLock爲了實現本身的需求而實現的內部類,之因此作成內部類,我認爲是隻在ReentrantLock使用上述幾個類,在外部沒有使用到。
咱們着重關注默認的非公平鎖的實現:
在ReentrantLock調用lock()的時候,調用的是下面的代碼:

/**
     * Acquires the lock.
     *
     * <p>Acquires the lock if it is not held by another thread and returns
     * immediately, setting the lock hold count to one.
     *
     * <p>If the current thread already holds the lock then the hold
     * count is incremented by one and the method returns immediately.
     *
     * <p>If the lock is held by another thread then the
     * current thread becomes disabled for thread scheduling
     * purposes and lies dormant until the lock has been acquired,
     * at which time the lock hold count is set to one.
     */
    public void lock() {
        sync.lock();
    }

sync的實現是NonfairSync,因此調用的是NonfairSync的lock方法:

/**
     * Sync object for non-fair locks
     * tips:調用Lock的時候,嘗試獲取鎖,這裏採用的CAS去嘗試獲取鎖,若是獲取鎖成功
     *       那麼,當前線程獲取到鎖,若是失敗,調用acquire處理。
     * 
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
        	
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

接下來看看compareAndSetState方法是怎麼進行鎖的獲取操做的:

/**
     * Atomically sets synchronization state to the given updated
     * value if the current state value equals the expected value.
     * This operation has memory semantics of a <tt>volatile</tt> read
     * and write.
     *
     * @param expect the expected value
     * @param update the new value
     * @return true if successful. False return indicates that the actual
     *         value was not equal to the expected value.
     *         
     * tips: 1.compareAndSetState的實現主要是經過Unsafe類實現的。
     *       2.之因此命名爲Unsafe,是由於這個類對於JVM來講是不安全的,咱們平時也是使用不了這個類的。
     *       3.Unsafe類內封裝了一些能夠直接操做指定內存位置的接口,是否是感受和C有點像了?
     *       4.Unsafe類封裝了CAS操做,來達到樂觀的鎖的爭搶的效果
     */
    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

主要的說明都在方法的註釋中,接下來簡單的看一下 compareAndSwapInt的實現:

/**
     * Atomically update Java variable to <tt>x</tt> if it is currently
     * holding <tt>expected</tt>.
     * @return <tt>true</tt> if successful
     */
    public final native boolean compareAndSwapInt(Object o, long offset,
                                                  int expected,
                                                  int x);

一個native方法,沮喪.....可是從註釋看意思是,以CAS的方式將制定字段設置爲指定的值。同時咱們也明白了這個方法多是用java實現不了,只能依賴JVm底層的C代碼實現。下面看看操做的stateOffset:

private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long stateOffset;
    private static final long headOffset;
    private static final long tailOffset;
    private static final long waitStatusOffset;
    private static final long nextOffset;

    static {
        try {
            //這個方法頗有意思,主要的意思是獲取AbstractQueuedSynchronizer的state成員的偏移量
            //經過這個偏移量來更新state成員,另外state是volatile的來保證可見性。
            stateOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
            headOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
            tailOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
            waitStatusOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("waitStatus"));
            nextOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("next"));

        } catch (Exception ex) { throw new Error(ex); }
    }

stateOffset 是AbstractQueuedSynchronizer內部定義的一個狀態量,AbstractQueuedSynchronizer是線程的競態條件,因此只要某一個線程CAS改變狀態成功,同時在沒有釋放的狀況下,其餘線程必然失敗(對於Unsafe類還不是很熟悉,後面還須要系統的學習)。
對於競爭成功的線程會調用 setExclusiveOwnerThread方法:

/**
     * The current owner of exclusive mode synchronization.
     */
    private transient Thread exclusiveOwnerThread;

    /**
     * Sets the thread that currently owns exclusive access. A
     * <tt>null</tt> argument indicates that no thread owns access.
     * This method does not otherwise impose any synchronization or
     * <tt>volatile</tt> field accesses.
     */
    protected final void setExclusiveOwnerThread(Thread t) {
        exclusiveOwnerThread = t;
    }

這個實現是比較簡單的,只是獲取當前線程的引用,令AbstractOwnableSynchronizer中的exclusiveOwnerThread引用到當前線程。競爭失敗的線程,會調用acquire方法,這個方法也是ReentrantLock設計的精華之處:

/**
     * Acquires in exclusive mode, ignoring interrupts.  Implemented
     * by invoking at least once {@link #tryAcquire},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquire} until success.  This method can be used
     * to implement method {@link Lock#lock}.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     * tips:此處主要是處理沒有獲取到鎖的線程
     * 	 tryAcquire:從新進行一次鎖獲取和進行鎖重入的處理。
     *      addWaiter:將線程添加到等待隊列中。
     * 	 acquireQueued:自旋獲取鎖。      
     *      selfInterrupt:中斷線程。
     *      三個條件的關係爲and,若是 acquireQueued返回true,那麼線程被中斷selfInterrupt會中斷線程
     */
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

AbstractQueuedSynchronizer爲抽象方法,調用tryAcquire時,調用的爲NonfairSync的tryAcquire。

protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
/**
         * Performs non-fair tryLock.  tryAcquire is
         * implemented in subclasses, but both need nonfair
         * try for trylock method.
         */
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

nonfairTryAcquire方法主要是作重入鎖的實現,synchronized自己支持鎖的重入,而ReentrantLock則是經過此處實現。在鎖狀態爲0時,從新嘗試獲取鎖。若是已經被佔用,那麼作一次是否當前線程爲佔用鎖的線程的判斷,若是是同樣的那麼進行計數,固然在鎖的relase過程當中會進行遞減,保證鎖的正常釋放。
若是沒有從新獲取到鎖或者鎖的佔用線程和當前線程是一個線程,方法返回false。那麼把線程添加到等待隊列中,調用addWaiter:

/**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
/**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node's predecessor
     */
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

這裏主要是用當前線程構建一個Node的等待隊列雙向鏈表,這裏addWaiter中和enq中的部分邏輯是重複的,我的感受多是若是能一次成功就避免了enq中的死循環。由於tail節點是volatile的同時node也是不會發生競爭的因此node.prev = pred;是安全的。可是tail的next是不斷競爭的,因此利用compareAndSetTail保證操做的串行化。接下來調用acquireQueued方法:

/**
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     *
     * @param node the node
     * @param arg the acquire argument
     * @return {@code true} if interrupted while waiting
     */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

此處是作Node節點線程的自旋過程,自旋過程主要檢查當前節點是否是head節點的next節點,若是是,則嘗試獲取鎖,若是獲取成功,那麼釋放當前節點,同時返回。至此一個非公平鎖的鎖獲取過程結束。
若是這裏一直不斷的循環檢查,實際上是很耗費性能的,JDK的實現確定不會這麼「弱智」,因此有了shouldParkAfterFailedAcquire和parkAndCheckInterrupt,這兩個方法就實現了線程的等待從而避免無限的輪詢:

/**
     * Checks and updates status for a node that failed to acquire.
     * Returns true if thread should block. This is the main signal
     * control in all acquire loops.  Requires that pred == node.prev
     *
     * @param pred node's predecessor holding status
     * @param node the node
     * @return {@code true} if thread should block
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

首先,檢查一下當前Node的前置節點pred是不是SIGNAL,若是是SIGNAL,那麼證實前置Node的線程已經Park了,若是waitStatus>0,那麼當前節點已經Concel或者中斷。那麼不斷調整當前節點的前置節點,將已經Concel的和已經中斷的線程移除隊列。若是waitStatus<0,那麼設置waitStatus爲SIGNAL,由於調用shouldParkAfterFailedAcquire的方法爲死循環調用,因此終將返回true。接下來看parkAndCheckInterrupt方法,當shouldParkAfterFailedAcquire返回True的時候執行parkAndCheckInterrupt方法:

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

此方法比較簡單,其實就是使當前的線程park,即暫停了線程的輪詢。當Unlock時會作後續節點的Unpark喚醒線程繼續爭搶鎖。
接下來看一下鎖的釋放過程,鎖釋放主要是經過unlock方法實現:

/**
     * Attempts to release this lock.
     *
     * <p>If the current thread is the holder of this lock then the hold
     * count is decremented.  If the hold count is now zero then the lock
     * is released.  If the current thread is not the holder of this
     * lock then {@link IllegalMonitorStateException} is thrown.
     *
     * @throws IllegalMonitorStateException if the current thread does not
     *         hold this lock
     */
    public void unlock() {
        sync.release(1);
    }

主要是調用AbstractQueuedSynchronizer同步器的release方法:

/**
     * Releases in exclusive mode.  Implemented by unblocking one or
     * more threads if {@link #tryRelease} returns true.
     * This method can be used to implement method {@link Lock#unlock}.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryRelease} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @return the value returned from {@link #tryRelease}
     */
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

tryRelease方法爲ReentrantLock中的Sync的tryRelease方法:

protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

tryRelease方法主要是作了一個釋放鎖的過程,將同步狀態state -1,直到減到0爲止,這主要是兼容重入鎖設計的,同時setExclusiveOwnerThread(null)清除當前佔用的線程。這些head節點後的線程和新進的線程就能夠開始爭搶。這裏須要注意的是對於同步隊列中的線程來講在setState(c),且c爲0的時候,同步隊列中的線程是沒有競爭鎖的,由於線程被park了尚未喚醒。可是此時對於新進入的線程是有機會獲取到鎖的。
下面代碼是進行線程的喚醒:

Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;

由於在setState(c)釋放了鎖以後,是沒有線程競爭的,因此head是當前的head節點,先檢查當前的Node是否合法,若是合法則unpark it。開始鎖的獲取。就回到了上面的for循環執行獲取鎖邏輯:

Paste_Image.png

至此鎖的釋放就結束了,能夠看到ReentrantLock是一個不斷的循環的狀態模型,裏面有不少東西值得咱們學習和思考。

ReentrantLock具備公平和非公平兩種模式,也各有優缺點:
公平鎖是嚴格的以FIFO的方式進行鎖的競爭,可是非公平鎖是無序的鎖競爭,剛釋放鎖的線程很大程度上能比較快的獲取到鎖,隊列中的線程只能等待,因此非公平鎖可能會有「飢餓」的問題。可是重複的鎖獲取能減少線程之間的切換,而公平鎖則是嚴格的線程切換,這樣對操做系統的影響是比較大的,因此非公平鎖的吞吐量是大於公平鎖的,這也是爲何JDK將非公平鎖做爲默認的實現。

最後:#####

關於併發和Lock還有不少的點仍是比較模糊,我也會繼續學習,繼續總結,若是文章中有什麼問題,還請各位看客及時指出,共同窗習。

相關文章
相關標籤/搜索