ReentrantLock 實現原理筆記(一)

java.util.concurrent.locks.ReentrantLock

exclusive : adj. (我的或集體) 專用的,專有的,獨有的,獨佔的; 排外的; 不肯接收新成員(尤指較低社會階層)的; 高檔的; 豪華的; 高級的html

reentrant : 可重入; 可重入的; 重入; 可再入的; 重進入java

一切從 Thread 線程開始

獨佔線程 exclusiveOwnerThread 出場:node

package java.util.concurrent.locks;

/**
 * A synchronizer that may be exclusively owned by a thread.  This
 * class provides a basis for creating locks and related synchronizers
 * that may entail a notion of ownership.  The
 * {@code AbstractOwnableSynchronizer} class itself does not manage or
 * use this information. However, subclasses and tools may use
 * appropriately maintained values to help control and monitor access
 * and provide diagnostics.
 *
 * @since 1.6
 * @author Doug Lea
 */
public abstract class AbstractOwnableSynchronizer
    implements java.io.Serializable {

    /** Use serial ID even though all fields transient. */
    private static final long serialVersionUID = 3737899427754241961L;

    /**
     * Empty constructor for use by subclasses.
     */
    protected AbstractOwnableSynchronizer() { }

    /**
     * The current owner of exclusive mode synchronization.
     */
    private transient Thread exclusiveOwnerThread;

    /**
     * Sets the thread that currently owns exclusive access.
     * A {@code null} argument indicates that no thread owns access.
     * This method does not otherwise impose any synchronization or
     * {@code volatile} field accesses.
     * @param thread the owner thread
     */
    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }

    /**
     * Returns the thread last set by {@code setExclusiveOwnerThread},
     * or {@code null} if never set.  This method does not otherwise
     * impose any synchronization or {@code volatile} field accesses.
     * @return the owner thread
     */
    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}

這裏的獲取當前鎖的獨佔線程的方法是 final 的:算法

protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }

AQS 繼承這個類:編程

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer {}

快速認識 ReentrantLock

直接上代碼:安全

fun main() {

    val time1 = measureTimeMillis {
        singleThreadSum()
    }

    val time2 = measureTimeMillis {
        multiThreadSumNoLock()
    }

    val time3 = measureTimeMillis {
        multiThreadSumUseLock()
    }

    println("time1:$time1")
    println("time2:$time2")
    println("time3:$time3")
}


fun singleThreadSum() {
    var sum: Long = 0

    for (i in 1..100000) {
        sum += i
    }

    for (i in 100001..200000) {
        sum += i
    }

    println("singleThreadSum: $sum")
}


fun multiThreadSumNoLock() {
    var sum: Long = 0

    val t1 = Thread {
        for (i in 1..100000) {
            sum += i
        }
    }

    val t2 = Thread {
        for (i in 100001..200000) {
            sum += i
        }
    }

    t1.start()
    t2.start()
    t1.join()
    t2.join()


    println("multiThreadSumNoLock:$sum")
}

fun multiThreadSumUseLock() {
    var sum: Long = 0
    val lock = ReentrantLock()

    val t1 = Thread {
        lock.lock()
        try {
            for (i in 1..100000) {
                sum += i
            }
        } finally {
            lock.unlock()
        }
    }

    val t2 = Thread {
        lock.lock()
        try {
            for (i in 100001..200000) {
                sum += i
            }
        } finally {
            lock.unlock()
        }
    }

    t1.start()
    t2.start()
    t1.join()
    t2.join()

    println("multiThreadSumUseLock:$sum")
}

運行結果(每次會有不一樣):併發

singleThreadSum: 20000100000
multiThreadSumNoLock:19496951532
multiThreadSumUseLock:20000100000
time1:2
time2:11
time3:8app

其中, lock() 方法背後發生的事情大概以下圖:框架

lock()

若是沒有線程使用則當即返回,並設置state爲1;
若是當前線程已經佔有鎖,則state加1;若是其餘線程佔有鎖,則當前線程不可用,等待.less

tryLock()

若是鎖可用,則獲取鎖,並當即返回值 true。
若是鎖不可用,則此方法將當即返回值 false.

unlock()

嘗試釋放鎖,若是當前線程佔有鎖則count減一,若是count爲0則釋放鎖。
若是佔有線程不是當前線程,則拋異常.

ReentrantLock 是什麼?

ReentrantLock, 可重入鎖 , 是一個基於AQS( )併發框架的併發控制類.

ReentrantLock內部實現了3個類,分別是:

Sync
NoFairSync
FairSync

其中Sync繼承自AQS,實現了釋放鎖的模板方法tryRelease(int).

abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;

        Sync() {
        }

        abstract void lock();

        final boolean nonfairTryAcquire(int var1) {
            Thread var2 = Thread.currentThread();
            int var3 = this.getState();
            if (var3 == 0) {
                if (this.compareAndSetState(0, var1)) {
                    this.setExclusiveOwnerThread(var2);
                    return true;
                }
            } else if (var2 == this.getExclusiveOwnerThread()) {
                int var4 = var3 + var1;
                if (var4 < 0) {
                    throw new Error("Maximum lock count exceeded");
                }

                this.setState(var4);
                return true;
            }

            return false;
        }

        protected final boolean tryRelease(int var1) {
            int var2 = this.getState() - var1;
            if (Thread.currentThread() != this.getExclusiveOwnerThread()) {
                throw new IllegalMonitorStateException();
            } else {
                boolean var3 = false;
                if (var2 == 0) {
                    var3 = true;
                    this.setExclusiveOwnerThread((Thread)null);
                }

                this.setState(var2);
                return var3;
            }
        }

        protected final boolean isHeldExclusively() {
            return this.getExclusiveOwnerThread() == Thread.currentThread();
        }

        final ConditionObject newCondition() {
            return new ConditionObject(this);
        }

        final Thread getOwner() {
            return this.getState() == 0 ? null : this.getExclusiveOwnerThread();
        }

        final int getHoldCount() {
            return this.isHeldExclusively() ? this.getState() : 0;
        }

        final boolean isLocked() {
            return this.getState() != 0;
        }

        private void readObject(ObjectInputStream var1) throws IOException, ClassNotFoundException {
            var1.defaultReadObject();
            this.setState(0);
        }
    }
}

而NoFairSync和FairSync都繼承自Sync,實現各類獲取鎖的方法tryAcquire(int)。

FairSync
static final class FairSync extends ReentrantLock.Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        FairSync() {
        }

        final void lock() {
            this.acquire(1);
        }

        protected final boolean tryAcquire(int var1) {
            Thread var2 = Thread.currentThread();
            int var3 = this.getState();
            if (var3 == 0) {
                if (!this.hasQueuedPredecessors() && this.compareAndSetState(0, var1)) {
                    this.setExclusiveOwnerThread(var2);
                    return true;
                }
            } else if (var2 == this.getExclusiveOwnerThread()) {
                int var4 = var3 + var1;
                if (var4 < 0) {
                    throw new Error("Maximum lock count exceeded");
                }

                this.setState(var4);
                return true;
            }

            return false;
        }
    }
NonfairSync
static final class NonfairSync extends ReentrantLock.Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        NonfairSync() {
        }

        final void lock() {
            if (this.compareAndSetState(0, 1)) {
                this.setExclusiveOwnerThread(Thread.currentThread());
            } else {
                this.acquire(1);
            }

        }

        protected final boolean tryAcquire(int var1) {
            return this.nonfairTryAcquire(var1);
        }
    }

[Ref: https://www.jianshu.com/p/620... ]

Lock

First time a thread wants to acquire the lock, it acquires it without any wait as the lock is not owned by any thread yet. Thread acquires and owns it.

Lock below is called the ReentrantLock, which is either open to be acquired or locked and is determined by its state which is either zero or non-zero. It knows the owning thread.

What is Reentrance Mutext?

When a thread acquires a previously unheld lock, the JVM records the owner thread and sets the acquisition count to one. If that same thread acquires the lock again, the count is incremented, and when the owning thread calls unlock, the count is decremented. When the count reaches zero, the lock is released.

Reentrancy means that locks are acquired on a per-thread rather than per-invocation basis. In other words, if a thread is not reentrant, and tries to acquire a lock that it already holds, the request won’t succeed.

A re-entrant lock can follow either fair or non-fair policy, by default it is non-fair. In this article we will discuss both non-fair and fair locks. Below is the class diagram.

Class diagram

ReentrantLock 實現原理

獲取鎖的背後發生了什麼?

When the second thread fails to acquire lock as the lock is already acquired by the first thread then it adds itself to the waiting node linked list at the end of the list.

Try acquiring lock for the queued thread:

After the thread adds itself to the waiting node linked list, it will still try acquiring lock in exclusive uninterruptible mode.

If the thread trying to acquire lock is not the successor to the header node then it won’t be able to acquire the lock. It will still not be parked yet and the predecessor’s status field will be checked to figure out whether the status represents the correct state.

Thread Parking

If the lock to acquire is currently owned by some other thread then the current thread trying to acquire the lock will be parked, that is, disabled for thread scheduling purposes unless the permit is available. If the permit is available then it is consumed and the call returns immediately, otherwise it remains blocked. The blocker (the lock) which is responsible for this thread parking will be set to the thread.

Data Structure

If the lock is already owned by a thread, any other thread trying to acquire the lock will put on a hold and will be waiting in a queue. The thread would be disabled for thread scheduling purpose. The queue here is a double linked list. The lock will know the owning thread, state (0 or 1), head and tail of the linked list. If the node’ successor node is waiting for the lock then its wait state property (-1) will reflect that.

Waiting Queue: 雙向鏈表

The thread fails to acquire the lock fails as the lock is already acquired by another thread. The thread is added to a double linked list queue where the header is a dummy node with a mode indicating that it is waiting for a signal and the current thread which failed to acquire the lock becomes the tail node and is the next node to header. Since the current thread fails to get the lock, it is parked as it remains blocked till some other thread unblocks it.

Second thread tries to acquire the lock, it too fails so gets queued. It becomes the new tail and the thread is parked.

unlock() 算法

If the current thread is the holder of this lock then the hold count is decremented. If the hold count is now zero then the lock is released. If the current thread is not the holder of this lock then llegalMonitorStateException is thrown.

Once the lock is released, the lock finds out the node which is in waiting status, removed the waiting status and wakes up the node’s successor so that the node woken up can acquire the lock.

/**
     * Wakes up node's successor, if one exists.
     *
     * @param node the node
     */
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

The thread to unpark is held in successor, which is normally just the next node. But if cancelled or apparently null, traverse backwards from tail to find the actual non-cancelled successor.

If the node woken up is immediate successor to the head node, it tries to acquire the lock and become the new head. The old head and the variables referring to it are nullified so the gc can claim the memory.

If the owning thread releases the lock, the head’s successor node’s thread will be unparked. The successor node is tail in this case, so we will end up with a single node which is itself head and tail both. The unparked thread will take the ownership of the lock.

Further release of the lock will simply nullify the owning thread and the state of the lock will be set to 0.

公平鎖和非公平鎖

公平鎖是指多個線程按照申請鎖的順序來獲取鎖,線程直接進入FIFO隊列,隊列中的第一個線程才能得到鎖。

用一個打水的例子來理解:

公平鎖的優勢是等待鎖的線程不會夯死。缺點是吞吐效率相對非公平鎖要低,等待隊列中除第一個線程之外的全部線程都會阻塞,CPU喚醒阻塞線程的開銷比非公平鎖大。

非公平鎖是多個線程加鎖時直接嘗試獲取鎖,獲取不到纔會到等待隊列的隊尾等待。但若是此時鎖恰好可用,那麼這個線程能夠無需阻塞直接獲取到鎖,因此非公平鎖有可能出現後申請鎖的線程先獲取鎖的場景。

非公平鎖的優勢是能夠減小喚起線程的開銷(由於可能有的線程能夠直接獲取到鎖,CPU也就不用喚醒它),因此總體的吞吐效率高。缺點是處於等待隊列中的線程可能會夯死(試想剛好每次有新線程來,它恰巧都每次獲取到鎖,此時還在排隊等待獲取鎖的線程就悲劇了.....),或者等好久纔會得到鎖。

小結: 公平鎖和非公平鎖的差別在因而否按照申請鎖的順序來獲取鎖,非公平鎖可能會出現有多個線程等待時,有一我的品特別的好的線程直接沒有等待而直接獲取到了鎖的狀況,他們各有利弊;Reentrantlock在構造時默認是非公平的,能夠經過參數控制。

[ Ref:https://www.jianshu.com/p/620... ]

Reentrantlock 與 AQS

java.util.concurrent.locks.AbstractQueuedSynchronizer

 * Provides a framework for implementing blocking locks and related
 * synchronizers (semaphores, events, etc) that rely on
 * first-in-first-out (FIFO) wait queues.  This class is designed to
 * be a useful basis for most kinds of synchronizers ....

ref: [http://www.cs.rochester.edu/wcms/research/systems/high_performance_synch/ ]

AQS是AbustactQueuedSynchronizer的簡稱,它是一個Java提升的底層同步工具類,用一個int類型的變量表示同步狀態,並提供了一系列的CAS操做來管理這個同步狀態。AQS的主要做用是爲Java中的併發同步組件提供統一的底層支持.

同步工具類Semaphore、CountDownLatch、ReentrantLock、ReentrantReadWriteLock、FutureTask等雖然各自都有不一樣特徵,可是簡單看一下源碼,每一個類內部都包含一個以下的內部子類定義:

abstract static class Sync extends AbstractQueuedSynchronizer

[ ref:https://blog.csdn.net/zhangdo... ]

AQS提供了一種實現阻塞鎖和一系列依賴FIFO等待隊列的同步器的框架,以下圖所示。

同步隊列是AQS很重要的組成部分,它是一個雙端隊列,遵循FIFO原則,主要做用是用來存放在鎖上阻塞的線程,當一個線程嘗試獲取鎖時,若是已經被佔用,那麼當前線程就會被構形成一個Node節點 add 到同步隊列的尾部,隊列的頭節點是成功獲取鎖的節點,當頭節點線程是否鎖時,會喚醒後面的節點並釋放當前頭節點的引用。

Queue 中節點 Node 的狀態機流起色制是 AQS 的核心:

static final class Node {
        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;

        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;

        /**
         * Status field, taking on only the values:
         *   SIGNAL:     The successor of this node is (or will soon be)
         *               blocked (via park), so the current node must
         *               unpark its successor when it releases or
         *               cancels. To avoid races, acquire methods must
         *               first indicate they need a signal,
         *               then retry the atomic acquire, and then,
         *               on failure, block.
         *   CANCELLED:  This node is cancelled due to timeout or interrupt.
         *               Nodes never leave this state. In particular,
         *               a thread with cancelled node never again blocks.
         *   CONDITION:  This node is currently on a condition queue.
         *               It will not be used as a sync queue node
         *               until transferred, at which time the status
         *               will be set to 0. (Use of this value here has
         *               nothing to do with the other uses of the
         *               field, but simplifies mechanics.)
         *   PROPAGATE:  A releaseShared should be propagated to other
         *               nodes. This is set (for head node only) in
         *               doReleaseShared to ensure propagation
         *               continues, even if other operations have
         *               since intervened.
         *   0:          None of the above
         *
         * The values are arranged numerically to simplify use.
         * Non-negative values mean that a node doesn't need to
         * signal. So, most code doesn't need to check for particular
         * values, just for sign.
         *
         * The field is initialized to 0 for normal sync nodes, and
         * CONDITION for condition nodes.  It is modified using CAS
         * (or when possible, unconditional volatile writes).
         */
        volatile int waitStatus;

        /**
         * Link to predecessor node that current node/thread relies on
         * for checking waitStatus. Assigned during enqueuing, and nulled
         * out (for sake of GC) only upon dequeuing.  Also, upon
         * cancellation of a predecessor, we short-circuit while
         * finding a non-cancelled one, which will always exist
         * because the head node is never cancelled: A node becomes
         * head only as a result of successful acquire. A
         * cancelled thread never succeeds in acquiring, and a thread only
         * cancels itself, not any other node.
         */
         .......
}

AQS爲一系列同步器依賴於一個單獨的原子變量(state)的同步器提供了一個很是有用的基礎。子類們必須定義改變state變量的protected方法,這些方法定義了state是如何被獲取或釋放的。鑑於此,本類中的其餘方法執行全部的排隊和阻塞機制。子類也能夠維護其餘的state變量,可是爲了保證同步,必須原子地操做這些變量。

   AbstractQueuedSynchronizer中對state的操做是原子的,且不能被繼承。全部的同步機制的實現均依賴於對改變量的原子操做。爲了實現不一樣的同步機制,咱們須要建立一個非共有的(non-public internal)擴展了AQS類的內部輔助類來實現相應的同步邏輯。AbstractQueuedSynchronizer並不實現任何同步接口,它提供了一些能夠被具體實現類直接調用的一些原子操做方法來重寫相應的同步邏輯。AQS同時提供了互斥模式(exclusive)和共享模式(shared)兩種不一樣的同步邏輯。通常狀況下,子類只須要根據需求實現其中一種模式,固然也有同時實現兩種模式的同步類,如ReadWriteLock。接下來將詳細介紹AbstractQueuedSynchronizer的提供的一些具體實現方法。

state狀態

  AbstractQueuedSynchronizer維護了一個volatile int類型的變量,用戶表示當前同步狀態。volatile雖然不能保證操做的原子性,可是保證了當前變量state的可見性。至於volatile的具體語義,能夠參考相關文章。state的訪問方式有三種:

  • getState()
  • setState()
  • compareAndSetState()

  這三種叫作均是原子操做,其中compareAndSetState的實現依賴於Unsafe的compareAndSwapInt()方法。代碼實現以下:

/**
     * The synchronization state.
     */
    private volatile int state;

    /**
     * Returns the current value of synchronization state.
     * This operation has memory semantics of a {@code volatile} read.
     * @return current state value
     */
    protected final int getState() {
        return state;
    }

    /**
     * Sets the value of synchronization state.
     * This operation has memory semantics of a {@code volatile} write.
     * @param newState the new state value
     */
    protected final void setState(int newState) {
        state = newState;
    }

    /**
     * Atomically sets synchronization state to the given updated
     * value if the current state value equals the expected value.
     * This operation has memory semantics of a {@code volatile} read
     * and write.
     *
     * @param expect the expected value
     * @param update the new value
     * @return {@code true} if successful. False return indicates that the actual
     *         value was not equal to the expected value.
     */
    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

自定義資源共享方式

  AQS定義兩種資源共享方式:Exclusive(獨佔,只有一個線程能執行,如ReentrantLock)和Share(共享,多個線程可同時執行,如Semaphore/CountDownLatch)。
  不一樣的自定義同步器爭用共享資源的方式也不一樣。自定義同步器在實現時只須要實現共享資源state的獲取與釋放方式便可,至於具體線程等待隊列的維護(如獲取資源失敗入隊/喚醒出隊等),AQS已經在頂層實現好了。自定義同步器實現時主要實現如下幾種方法:

  • isHeldExclusively():該線程是否正在獨佔資源。只有用到condition才須要去實現它。
  • tryAcquire(int):獨佔方式。嘗試獲取資源,成功則返回true,失敗則返回false。
  • tryRelease(int):獨佔方式。嘗試釋放資源,成功則返回true,失敗則返回false。
  • tryAcquireShared(int):共享方式。嘗試獲取資源。負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。
  • tryReleaseShared(int):共享方式。嘗試釋放資源,若是釋放後容許喚醒後續等待結點返回true,不然返回false。

[ Ref:https://www.jianshu.com/p/da9... ]

ReentrantLock 重入鎖的基本原理是判斷上次獲取鎖的線程是否爲當前線程,若是是則可再次進入臨界區,若是不是,則阻塞。

因爲ReentrantLock是基於AQS實現的,底層經過操做同步狀態來獲取鎖.,下面看一下非公平鎖的實現邏輯:

final boolean nonfairTryAcquire(int acquires) {
            //獲取當前線程
            final Thread current = Thread.currentThread();
            //經過AQS獲取同步狀態
            int c = getState();
            //同步狀態爲0,說明臨界區處於無鎖狀態,
            if (c == 0) {
                //修改同步狀態,即加鎖
                if (compareAndSetState(0, acquires)) {
                    //將當前線程設置爲鎖的owner
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //若是臨界區處於鎖定狀態,且上次獲取鎖的線程爲當前線程
            else if (current == getExclusiveOwnerThread()) {
                 //則遞增同步狀態
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

讀寫鎖

Java提供了一個基於AQS到讀寫鎖實現ReentrantReadWriteLock,該讀寫鎖到實現原理是:將同步變量state按照高16位和低16位進行拆分,高16位表示讀鎖,低16位表示寫鎖。

寫鎖的獲取與釋放

寫鎖是一個獨佔鎖,因此咱們看一下ReentrantReadWriteLock中tryAcquire(arg)的實現:

protected final boolean tryAcquire(int acquires) {
            Thread current = Thread.currentThread();
            int c = getState();
            int w = exclusiveCount(c);
            if (c != 0) {
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // Reentrant acquire
                setState(c + acquires);
                return true;
            }
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        }

上述代碼的處理流程已經很是清晰:

獲取同步狀態,並從中分離出低16爲的寫鎖狀態
若是同步狀態不爲0,說明存在讀鎖或寫鎖
若是存在讀鎖(c !=0 && w == 0),則不能獲取寫鎖(保證寫對讀的可見性)
若是當前線程不是上次獲取寫鎖的線程,則不能獲取寫鎖(寫鎖爲獨佔鎖)
若是以上判斷均經過,則在低16爲寫鎖同步狀態上利用CAS進行修改(增長寫鎖同步狀態,實現可重入)
將當前線程設置爲寫鎖的獲取線程

寫鎖的釋放過程與獨佔鎖基本相同:

protected final boolean tryRelease(int releases) {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            int nextc = getState() - releases;
            boolean free = exclusiveCount(nextc) == 0;
            if (free)
                setExclusiveOwnerThread(null);
            setState(nextc);
            return free;
        }

在釋放的過程當中,不斷減小讀鎖同步狀態,只爲同步狀態爲0時,寫鎖徹底釋放。

讀鎖的獲取與釋放

讀鎖是一個共享鎖,獲取讀鎖的步驟以下:

獲取當前同步狀態
計算高16爲讀鎖狀態+1後的值
若是大於可以獲取到的讀鎖的最大值,則拋出異常
若是存在寫鎖而且當前線程不是寫鎖的獲取者,則獲取讀鎖失敗
若是上述判斷都經過,則利用CAS從新設置讀鎖的同步狀態
讀鎖的獲取步驟與寫鎖相似,即不斷的釋放寫鎖狀態,直到爲0時,表示沒有線程獲取讀鎖。

[ ref:https://blog.csdn.net/zhangdo... ]

Unsafe 類

咱們能夠看到在 AbstractQueuedSynchronizer 中用到了 Unsafe 類:

/**
     * Setup to support compareAndSet. We need to natively implement
     * this here: For the sake of permitting future enhancements, we
     * cannot explicitly subclass AtomicInteger, which would be
     * efficient and useful otherwise. So, as the lesser of evils, we
     * natively implement using hotspot intrinsics API. And while we
     * are at it, we do the same for other CASable fields (which could
     * otherwise be done with atomic field updaters).
     */
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long stateOffset;
    private static final long headOffset;
    private static final long tailOffset;
    private static final long waitStatusOffset;
    private static final long nextOffset;

    static {
        try {
            stateOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
            headOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
            tailOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
            waitStatusOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("waitStatus"));
            nextOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("next"));

        } catch (Exception ex) { throw new Error(ex); }
    }

    /**
     * CAS head field. Used only by enq.
     */
    private final boolean compareAndSetHead(Node update) {
        return unsafe.compareAndSwapObject(this, headOffset, null, update);
    }

    /**
     * CAS tail field. Used only by enq.
     */
    private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }

    /**
     * CAS waitStatus field of a node.
     */
    private static final boolean compareAndSetWaitStatus(Node node,
                                                         int expect,
                                                         int update) {
        return unsafe.compareAndSwapInt(node, waitStatusOffset,
                                        expect, update);
    }

    /**
     * CAS next field of a node.
     */
    private static final boolean compareAndSetNext(Node node,
                                                   Node expect,
                                                   Node update) {
        return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
    }
}

實現線程安全且高效地設置隊列中節點 Node 的等待狀態 waitStatus .

好比說, ReentrantLock 中非公平鎖的實現代碼中:

/**
     * Sync object for non-fair locks
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

compareAndSetState(0, 1) 方法:

/**
     * Atomically sets synchronization state to the given updated
     * value if the current state value equals the expected value.
     * This operation has memory semantics of a {@code volatile} read
     * and write.
     *
     * @param expect the expected value
     * @param update the new value
     * @return {@code true} if successful. False return indicates that the actual
     *         value was not equal to the expected value.
     */
    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

Unsafe 類裏面的實現方法基本都是 native 方法:

...
    public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);

    public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

    public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
...

參考資料

https://www.javarticles.com/2012/09/reentrant-lock.html

https://www.javarticles.com/2016/06/java-reentrantlock-interruption-example.html


Kotlin 開發者社區

國內第一Kotlin 開發者社區公衆號,主要分享、交流 Kotlin 編程語言、Spring Boot、Android、React.js/Node.js、函數式編程、編程思想等相關主題。

Kotlin 開發者社區

相關文章
相關標籤/搜索