java.util.concurrent.locks.ReentrantReadWriteLock 源碼

相關類圖:

    ReentrantReadWriteLock 實現了ReadWriteLock 接口。其自身有五個內部類,五個內部類之間也是相互關聯的。內部類的關係以下圖所示。html

    

    如上圖所示,Sync繼承自AQS、NonfairSync繼承自Sync類、FairSync繼承自Sync類;ReadLock實現了Lock接口、WriteLock也實現了Lock接口。java

 

java.util.concurrent.locks.ReentrantReadWriteLock 源碼:

package java.util.concurrent.locks;

import java.util.concurrent.TimeUnit;
import java.util.Collection;

public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
    private static final long serialVersionUID = -6992448646407690164L;// 版本序列號
    // 讀鎖
    private final ReentrantReadWriteLock.ReadLock readerLock;
    // 寫鎖
    private final ReentrantReadWriteLock.WriteLock writerLock;
    // 同步隊列
    final Sync sync;//Sync抽象類繼承自AQS抽象類,Sync類提供了對ReentrantReadWriteLock的支持

   
    public ReentrantReadWriteLock() {//無參構造函數,默認調用非公平策略構造函數
        this(false);
    }

    //設置公平策略或者非公平策略,並建立讀鎖與寫鎖對象實例
    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }

    //實現了ReadWriteLock的writeLock方法,返回一個寫入鎖對象
    public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }

    ////實現了ReadWriteLock的readLock方法,返回一個讀取鎖對象
    public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }

    //Sync類內部存在兩個內部類,分別爲HoldCounter和ThreadLocalHoldCounter
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 6317671515068378041L;// 版本序列號

        static final int SHARED_SHIFT   = 16;// 高16位爲讀鎖,低16位爲寫鎖
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);// 讀鎖單位
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;// 讀鎖最大數量
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;// 寫鎖最大數量

        private transient ThreadLocalHoldCounter readHolds;//本地線程計數器
        private transient HoldCounter cachedHoldCounter; // 緩存的計數器

        private transient Thread firstReader = null;// 第一個讀線程
        private transient int firstReaderHoldCount;// 第一個讀線程的計數

        //佔有讀鎖的線程數量
        static int sharedCount(int c){
           //直接將state右移16位,就能夠獲得讀鎖的線程數量,由於state的高16位表示讀鎖,對應的第十六位表示寫鎖數量
           return c >>> SHARED_SHIFT;
        }

        //佔有寫鎖的線程數量
        static int exclusiveCount(int c) {
           //直接將狀態state和(2^16 - 1)作與運算,其等效於將state模上2^16。寫鎖數量由state的低十六位表示
           return c & EXCLUSIVE_MASK;
        }

        //HoldCounter主要與讀鎖配套使用
        static final class HoldCounter {
            int count = 0;// 某個讀線程重入的次數
            
            // 獲取當前線程的TID屬性的值,用來惟一標識一個線程
            final long tid = getThreadId(Thread.currentThread());
        }

        /*ThreadLocalHoldCounter重寫了ThreadLocal的initialValue方法,ThreadLocal類能夠將線程與對象相關聯。在沒有進行set的狀況下,get到的均是initialValue方法裏面生成的那個HolderCounter對象*/
        static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {// 本地線程計數器
            //重寫初始化方法,在沒有進行set的狀況下,獲取的都是該HoldCounter值
            public HoldCounter initialValue() {
                return new HoldCounter();
            }
        }

        // 構造函數
        Sync() {
            readHolds = new ThreadLocalHoldCounter();// 本地線程計數器
            setState(getState()); // 設置AQS的狀態
        }

        //建立IllegalMonitorStateException異常對象實例
        private IllegalMonitorStateException unmatchedUnlockException() {
            return new IllegalMonitorStateException(
                "attempt to unlock read lock, not locked by current thread");
        }

        //讀線程是否應該被阻塞
        abstract boolean readerShouldBlock();

        //寫線程是否應該被阻塞
        abstract boolean writerShouldBlock();

        //用於釋放寫鎖資源
        protected final boolean tryRelease(int releases) {
            if (!isHeldExclusively())//當前線程不是寫鎖持有者,則拋出異常
                throw new IllegalMonitorStateException();
            int nextc = getState() - releases;//計算釋放資源後的寫鎖的數量
            boolean free = exclusiveCount(nextc) == 0;//若爲0,獲得true

            if (free)//若爲0,表示須要釋放資源;不然,只需修改狀態計數值便可,繼續保持資源的佔用狀態.
                setExclusiveOwnerThread(null);

            setState(nextc);//修改狀態的計數值
            return free;//返回釋放結果
        }

        //用於獲取寫鎖
        protected final boolean tryAcquire(int acquires) {
            // 獲取當前線程
            Thread current = Thread.currentThread();
            int c = getState();// 獲取狀態
            int w = exclusiveCount(c);// 寫線程數量

            if (c != 0) {// 狀態不爲0
                // 寫線程數量爲0,則爲讀鎖佔據;寫線程不爲0,但當前線程沒有佔有該獨佔鎖
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;//獲取寫鎖失敗
                if (w + exclusiveCount(acquires) > MAX_COUNT)// 判斷獲取寫鎖重入次數是否超過最大值限制
                    throw new Error("Maximum lock count exceeded");
                // 設置AQS狀態
                setState(c + acquires);
                return true;//獲取成功
            }

            //此處的c等於0.此時沒有讀鎖線程和寫鎖線程
            //判斷寫線程是否應該被阻塞:非公平策略下老是不會被阻塞,在公平策略下須要進行判斷是否有等待時間更長的讀取線程
            if (writerShouldBlock() || !compareAndSetState(c, c + acquires))
                return false;//若寫線程須要阻塞,或CAS設置狀態失敗,則返回獲取失敗
            
            setExclusiveOwnerThread(current);// 設置獨佔線程
            return true;//獲取成功
        }

        //讀鎖線程釋放鎖
        /*首先判斷當前線程是否爲第一個讀線程firstReader,如果,則判斷第一個讀線程佔有的資源數firstReaderHoldCount是否爲1,如果,則設置第一個讀線程firstReader爲空,不然,將第一個讀線程佔有的資源數firstReaderHoldCount減1;若當前線程不是第一個讀線程,那麼首先會獲取緩存計數器(上一個讀鎖線程對應的計數器 ),若計數器爲空或者tid不等於當前線程的tid值,則獲取當前線程的計數器,若是計數器的計數count小於等於1,則移除當前線程對應的計數器,若是計數器的計數count小於等於0,則拋出異常,以後再減小計數便可。不管何種狀況,都會進入無限循環,該循環能夠確保成功設置狀態state*/
        protected final boolean tryReleaseShared(int unused) {
            //獲取當前線程
            Thread current = Thread.currentThread();
            if (firstReader == current) {// 當前線程爲第一個讀線程
                if (firstReaderHoldCount == 1)// 讀線程佔用的資源數爲1
                    firstReader = null;
                else// 減小佔用的資源
                    firstReaderHoldCount--;
            } else {// 當前線程不爲第一個讀線程
                // 獲取緩存的計數器
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))// 計數器爲空或者計數器的tid不爲當前正在運行的線程的tid
                    // 獲取當前線程對應的計數器
                    rh = readHolds.get();

                // 獲取計數
                int count = rh.count;

                if (count <= 1) { // 計數小於等於1
                    // 移除
                    readHolds.remove();
                    if (count <= 0) // 計數小於等於0,拋出異常
                        throw unmatchedUnlockException();
                }

                // 減小計數
                --rh.count;
            }

            //自旋CAS,減去1<<16
            for (;;) { // 無限循環
                // 獲取狀態
                int c = getState();
                // 獲取狀態
                int nextc = c - SHARED_UNIT;

                if (compareAndSetState(c, nextc)) // 比較並進行設置
                    return nextc == 0;
            }
        }

        //讀鎖線程獲取讀鎖
        /*若寫鎖不爲0而且當前線程不佔有寫鎖,則直接返回-1;若讀線程須要被阻塞且讀線程數量小於最大值以及狀態值未改變且修改狀態值成功,則得到讀鎖成功.此時須要判斷當前若爲第一個讀鎖,則須要設置第一個讀線程firstReader和firstReaderHoldCount;若當前線程爲第一個讀線程,則增長firstReaderHoldCount;不然,將設置當前線程對應的HoldCounter對象的值*/
        protected final int tryAcquireShared(int unused) {// 共享模式下獲取資源
            // 獲取當前線程
            Thread current = Thread.currentThread();
            // 獲取狀態
            int c = getState();

            //存在寫鎖且當前線程不是獲取寫鎖的線程,返回-1,獲取讀鎖失敗
            if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
                return -1;

            // 讀線程數量
            int r = sharedCount(c);

            // 若讀線程須要被阻塞且讀線程數量小於最大值以及狀態值未改變且修改狀態值成功,則得到讀鎖成功
            if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { 
                //firstReader是不會放到readHolds裏的, 這樣,在讀鎖只有一個的狀況下,就避免了查找readHolds                
                if (r == 0) { // 讀鎖數量爲0,(首次獲取讀鎖)
                    // 設置第一個讀線程
                    firstReader = current;
                    // 讀線程佔用的資源數爲1
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {// 當前線程爲第一個讀線程 (firstReader重入)
                    // 佔用資源數加1
                    firstReaderHoldCount++;
                } else {// 讀鎖數量不爲0而且第一個線程不爲當前線程
                    HoldCounter rh = cachedHoldCounter;//讀鎖重入計數緩存,基於ThreadLocal實現

                    if (rh == null || rh.tid != getThreadId(current))// 計數器爲空或者計數器的tid不爲當前正在運行的線程的tid
                        //readHolds是緩存了當前線程的讀鎖重入次數的ThreadLocal
                        //當前線程天然是最後獲取鎖的線程,故將當前線程的holdCounter賦給cachedHoldCounter
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0) //計數爲0
                        //緩存當前線程的holdCounter
                        //在fullTryAcquireShared()方法中,獲取讀鎖失敗的線程會執行:readHolds.remove(),故此時須要從新設置 
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }

           //首次獲取讀鎖失敗後,重試獲取
            return fullTryAcquireShared(current);
        }

        //處理CAS更新失敗和未考慮寫鎖可重入獲取讀鎖,而獲取讀鎖失敗的狀況.
        final int fullTryAcquireShared(Thread current) {
            HoldCounter rh = null;
            for (;;) { // 無限循環
                // 獲取狀態
                int c = getState();
                
                if (exclusiveCount(c) != 0) {// 寫線程數量不爲0,且被其餘線程持有寫入鎖
                    if (getExclusiveOwnerThread() != current)
                        return -1;//獲取讀鎖失敗,直接返回.
                }else if (readerShouldBlock()) {//寫線程數量爲0而且讀線程應該被阻塞
                    if (firstReader == current) { // 當前線程爲第一個讀線程
                       
                    } else {//當前線程不爲第一個讀線程
                        if (rh == null) {
                            // 獲取計數器
                            rh = cachedHoldCounter;

                            if (rh == null || rh.tid != getThreadId(current)) { // 計數器爲空或者計數器的tid不爲當前正在運行的線程的tid
                                rh = readHolds.get();
                                if (rh.count == 0)//計數爲0
                                    readHolds.remove();//移除
                            }
                        }

                        if (rh.count == 0)//計數器爲0
                            return -1;
                    }
                }
                if (sharedCount(c) == MAX_COUNT) // 讀鎖數量爲最大值,拋出異常
                    throw new Error("Maximum lock count exceeded");
                if (compareAndSetState(c, c + SHARED_UNIT)) { // 比較而且設置成功
                    if (sharedCount(c) == 0) { // 讀線程數量爲0
                        // 設置第一個讀線程
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        if (rh == null)
                            rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                        cachedHoldCounter = rh; // cache for release
                    }
                    return 1;
                }
            }
        }

        //用於寫入鎖的tryLock方法
        final boolean tryWriteLock() {
            Thread current = Thread.currentThread();
            int c = getState();
            if (c != 0) {
                int w = exclusiveCount(c);//獲得寫入鎖的數量
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
            }
            if (!compareAndSetState(c, c + 1))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        }

        //用於讀取鎖的tryLock方法
        final boolean tryReadLock() {
            Thread current = Thread.currentThread();
            for (;;) {
                int c = getState();
                if (exclusiveCount(c) != 0 &&
                    getExclusiveOwnerThread() != current)
                    return false;
                int r = sharedCount(c);//獲得讀取鎖的數量
                if (r == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                    if (r == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        HoldCounter rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            cachedHoldCounter = rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                    }
                    return true;
                }
            }
        }

        //當前線程是否持有獨佔鎖
        protected final boolean isHeldExclusively() {
            return getExclusiveOwnerThread() == Thread.currentThread();
        }

        //建立一個內部條件,用於寫入鎖的newCondition方法
        final ConditionObject newCondition() {
            return new ConditionObject();
        }

        //返回當前擁有寫入鎖的線程,若是沒有這樣的線程,則返回 null
        final Thread getOwner() {
            return ((exclusiveCount(getState()) == 0) ?
                    null :
                    getExclusiveOwnerThread());
        }

        //查詢持有讀取鎖的總數
        final int getReadLockCount() {
            return sharedCount(getState());
        }

        //查詢是否某個線程保持了寫入鎖
        final boolean isWriteLocked() {
            return exclusiveCount(getState()) != 0;
        }

        //查詢當前線程在此鎖上保持的重入寫入鎖數量
        final int getWriteHoldCount() {
            return isHeldExclusively() ? exclusiveCount(getState()) : 0;
        }

        //查詢當前線程在此鎖上保持的重入讀取鎖數量
        final int getReadHoldCount() {
            if (getReadLockCount() == 0)
                return 0;

            Thread current = Thread.currentThread();
            if (firstReader == current)
                return firstReaderHoldCount;

            HoldCounter rh = cachedHoldCounter;
            if (rh != null && rh.tid == getThreadId(current))
                return rh.count;

            int count = readHolds.get().count;
            if (count == 0) readHolds.remove();
            return count;
        }

        //自定義序列化方法
        private void readObject(java.io.ObjectInputStream s)
            throws java.io.IOException, ClassNotFoundException {
            s.defaultReadObject();
            readHolds = new ThreadLocalHoldCounter();
            setState(0); // 重置爲未鎖定狀態
        }

        //獲得同步狀態值        
        final int getCount() { return getState(); }
    }

    //非公平策略
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -8159625535654395037L;//版本號

        final boolean writerShouldBlock() {//非公平策略,寫入鎖老是阻塞
            return false;
        }

        final boolean readerShouldBlock() {//讀取鎖是否堵塞,取決於等待隊列是否有獲取寫入鎖的線程等待
            return apparentlyFirstQueuedIsExclusive();
        }
    }

    //公平策略
    static final class FairSync extends Sync {
        private static final long serialVersionUID = -2274990926593161451L;//版本號
        
        final boolean writerShouldBlock() {//等待隊列中在當前線程以前有等待線程,則阻塞
            return hasQueuedPredecessors();
        }

        final boolean readerShouldBlock() {//等待隊列中在當前線程以前有等待線程,則阻塞
            return hasQueuedPredecessors();
        }
    }

    //讀取鎖
    public static class ReadLock implements Lock, java.io.Serializable {
        private static final long serialVersionUID = -5992448646407690164L;//版本號

        private final Sync sync;//同步隊列對象引用

        //構造方法(將同步隊列對象引用 指向 ReentrantReadWriteLock 的同步隊列)
        protected ReadLock(ReentrantReadWriteLock lock) {
            sync = lock.sync;
        }

        //若是寫鎖未被另外一個線程持有,則獲取讀取鎖並當即返回
        public void lock() {
            sync.acquireShared(1);
        }

        //支持中斷,獲取讀取鎖方法        
        public void lockInterruptibly() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }

        //讀鎖線程獲取讀鎖
        public boolean tryLock() {
            return sync.tryReadLock();
        }

        //嘗試在共享模式下獲取讀取鎖,若是中斷,則停止;若是超過給定的時間,則返回false
        public boolean tryLock(long timeout, TimeUnit unit)
                throws InterruptedException {
            return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
        }

        //釋放鎖
        public void unlock() {
            sync.releaseShared(1);
        }

        //讀取鎖不支持內部條件隊列,若調用readLock().newCondition();會拋出UnsupportedOperationException
        public Condition newCondition() {
            throw new UnsupportedOperationException();
        }

        //返回標識此鎖及其持有讀取鎖的重入次數的字符串
        public String toString() {
            int r = sync.getReadLockCount();
            return super.toString() +
                "[Read locks = " + r + "]";
        }
    }

    //寫入鎖
    public static class WriteLock implements Lock, java.io.Serializable {
        private static final long serialVersionUID = -4992448646407690164L;//版本號

        private final Sync sync;//同步隊列對象引用

        //構造方法(將同步隊列對象引用 指向 ReentrantReadWriteLock 的同步隊列)
        protected WriteLock(ReentrantReadWriteLock lock) {
            sync = lock.sync;
        }

        //獲取獨佔鎖,並忽略中斷
        public void lock() {
            sync.acquire(1);
        }

        //獲取獨佔鎖,支持中斷響應
        public void lockInterruptibly() throws InterruptedException {
            sync.acquireInterruptibly(1);
        }

        //獲取寫鎖
        public boolean tryLock( ) {
            return sync.tryWriteLock();
        }

        //嘗試在獨佔模式下獲取寫入鎖,若是中斷,則停止;若是超過給定的時間,則返回false
        public boolean tryLock(long timeout, TimeUnit unit)
                throws InterruptedException {
            return sync.tryAcquireNanos(1, unit.toNanos(timeout));
        }

        //釋放鎖
        public void unlock() {
            sync.release(1);
        }

        //新增條件隊列
        public Condition newCondition() {
            return sync.newCondition();
        }

        //返回標識此鎖及其鎖定狀態的字符串
        public String toString() {
            Thread o = sync.getOwner();
            return super.toString() + ((o == null) ?
                                       "[Unlocked]" :
                                       "[Locked by thread " + o.getName() + "]");
        }

        //返回當前線程是否獨佔鎖
        public boolean isHeldByCurrentThread() {
            return sync.isHeldExclusively();
        }

        //返回寫入鎖的重入次數
        public int getHoldCount() {
            return sync.getWriteHoldCount();
        }
    }

    //查詢當前對象是否爲公平鎖對象實例
    public final boolean isFair() {
        return sync instanceof FairSync;
    }

    //返回當前擁有寫入鎖的線程,若是沒有這樣的線程,則返回 null
    protected Thread getOwner() {
        return sync.getOwner();
    }

    //查詢爲此鎖保持的讀取鎖數量
    public int getReadLockCount() {
        return sync.getReadLockCount();
    }

    //查詢是否某個線程保持了寫入鎖
    public boolean isWriteLocked() {
        return sync.isWriteLocked();
    }

    //查詢當前線程是否保持了寫入鎖
    public boolean isWriteLockedByCurrentThread() {
        return sync.isHeldExclusively();
    }

    //查詢當前線程在此鎖上保持的重入寫入鎖數量
    public int getWriteHoldCount() {
        return sync.getWriteHoldCount();
    }

    //查詢當前線程在此鎖上保持的重入讀取鎖數量
    public int getReadHoldCount() {
        return sync.getReadHoldCount();
    }

    //返回一個collection,它包含可能正在等待獲取寫入鎖的線程
    protected Collection<Thread> getQueuedWriterThreads() {
        return sync.getExclusiveQueuedThreads();
    }

    //返回一個collection,它包含可能正在等待獲取讀取鎖的線程
    protected Collection<Thread> getQueuedReaderThreads() {
        return sync.getSharedQueuedThreads();
    }

    //查詢是否全部的線程正在等待獲取讀取或寫入鎖
    public final boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }

    //查詢是否給定線程正在等待獲取讀取或寫入鎖
    public final boolean hasQueuedThread(Thread thread) {
        return sync.isQueued(thread);
    }

    //返回等待獲取讀取或寫入鎖的線程估計數目
    public final int getQueueLength() {
        return sync.getQueueLength();
    }

    //返回一個 collection,它包含可能正在等待獲取讀取或寫入鎖的線程
    protected Collection<Thread> getQueuedThreads() {
        return sync.getQueuedThreads();
    }

    //查詢是否有些線程正在等待與寫入鎖有關的給定條件
    public boolean hasWaiters(Condition condition) {
        if (condition == null)
            throw new NullPointerException();
        if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
            throw new IllegalArgumentException("not owner");
        return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject)condition);
    }

    //返回正等待與寫入鎖相關的給定條件的線程估計數目
    public int getWaitQueueLength(Condition condition) {
        if (condition == null)
            throw new NullPointerException();
        if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
            throw new IllegalArgumentException("not owner");
        return sync.getWaitQueueLength((AbstractQueuedSynchronizer.ConditionObject)condition);
    }

    //返回一個 collection,它包含可能正在等待與寫入鎖相關的給定條件的那些線程
    protected Collection<Thread> getWaitingThreads(Condition condition) {
        if (condition == null)
            throw new NullPointerException();
        if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
            throw new IllegalArgumentException("not owner");
        return sync.getWaitingThreads((AbstractQueuedSynchronizer.ConditionObject)condition);
    }

    //返回標識此鎖及其鎖狀態的字符串
    public String toString() {
        int c = sync.getCount();
        int w = Sync.exclusiveCount(c);
        int r = Sync.sharedCount(c);

        return super.toString() +
            "[Write locks = " + w + ", Read locks = " + r + "]";
    }

    static final long getThreadId(Thread thread) {
        return UNSAFE.getLongVolatile(thread, TID_OFFSET);
    }

    // Unsafe類 提供了硬件級別的原子操做
    private static final sun.misc.Unsafe UNSAFE;
    private static final long TID_OFFSET;// 線程ID的偏移地址
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> tk = Thread.class;
            //獲取線程的tid字段的內存地址
            TID_OFFSET = UNSAFE.objectFieldOffset(tk.getDeclaredField("tid"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }

}

類 ReentrantReadWriteLock

實現的接口:api

    SerializableReadWriteLock緩存

    支持與 ReentrantLock 相似語義的 ReadWriteLock 實現。(在避免"讀-寫"、"寫-寫"衝突的同時也容許多個讀操做同時進行,從而在某些情形下,提升了程序的性能。數據結構

    此類具備如下屬性:併發

  • 獲取順序

        此類不會將讀取者優先或寫入者優先強加給鎖訪問的排序。可是,它確實支持可選的公平 策略。app

    非公平模式(默認)函數

        當非公平地(默認)構造時,未指定進入讀寫鎖的順序。連續競爭的非公平鎖可能無限期地推遲一個或多個 reader 或 writer 線程,但吞吐量一般要高於公平鎖。

    公平模式高併發

        當公平地構造線程時,線程利用一個近似到達順序的策略來爭奪進入。當釋放當前保持的鎖時,能夠爲等待時間最長的單個 writer 線程分配寫入鎖,若是有一組等待時間大於全部正在等待的 writer 線程 的 reader 線程,將爲該組分配讀取鎖.
        注意,非阻塞 ReentrantReadWriteLock.ReadLock.tryLock() 和 ReentrantReadWriteLock.WriteLock.tryLock() 方法不會遵照此公平設置,而是直接將得到鎖,而不考慮等待的線程。
  • 重入

        此鎖容許 reader 和 writer 從新獲取讀取鎖或寫入鎖。須要注意:在寫入線程保持的全部寫入鎖都已經釋放後,才容許重入 reader 使用它們。性能

        此外,writer 能夠獲取讀取鎖,但反過來則不成立。若是 reader 試圖獲取寫入鎖,那麼將永遠不會得到成功。
  • 鎖降級

        重入還容許從寫入鎖降級爲讀取鎖,其實現方式是:先獲取寫入鎖,而後獲取讀取鎖,最後釋放寫入鎖。可是,從讀取鎖升級到寫入鎖是不可能的

  • 鎖獲取的中斷

        讀取鎖和寫入鎖都支持鎖獲取期間的中斷。

  • Condition 支持

        寫入鎖提供了一個 Condition 實現,對於寫入鎖來講,該實現的行爲與 ReentrantLock.newCondition() 提供的 Condition 實現對 ReentrantLock 所作的行爲相同。固然,此 Condition 只能用於寫入鎖。

        讀取鎖不支持 ConditionreadLock().newCondition() 會拋出 UnsupportedOperationException

     此類行爲的序列化方式與內置鎖的相同:反序列化的鎖處於解除鎖狀態,不管序列化該鎖時其狀態如何。

    下面的代碼展現瞭如何利用重入來執行升級緩存後的鎖降級(爲簡單起見,省略了異常處理):

class CachedData {
   Object data;
   volatile boolean cacheValid;
   ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

   void processCachedData() {
     rwl.readLock().lock();
     if (!cacheValid) {
        // Must release read lock before acquiring write lock
        rwl.readLock().unlock();
        rwl.writeLock().lock();
        // Recheck state because another thread might have acquired
        //   write lock and changed state before we did.
        if (!cacheValid) {
          data = ...
          cacheValid = true;
        }
        // Downgrade by acquiring read lock before releasing write lock
        rwl.readLock().lock();
        rwl.writeLock().unlock(); // Unlock write, still hold read
     }

     use(data);
     rwl.readLock().unlock();
   }
 }

    在使用某些種類的 Collection 時,可使用 ReentrantReadWriteLock 來提升併發性。一般,在預期 collection 很大,讀取者線程訪問它的次數多於寫入者線程,這很值得一試。例如,如下是一個使用 TreeMap 的類,預期它很大,而且能被同時訪問。

class RWDictionary {
    private final Map<String, Data> m = new TreeMap<String, Data>();
    private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
    private final Lock r = rwl.readLock();
    private final Lock w = rwl.writeLock();

    public Data get(String key) {
        r.lock();
        try { return m.get(key); }
        finally { r.unlock(); }
    }
    public String[] allKeys() {
        r.lock();
        try { return m.keySet().toArray(); }
        finally { r.unlock(); }
    }
    public Data put(String key, Data value) {
        w.lock();
        try { return m.put(key, value); }
        finally { w.unlock(); }
    }
    public void clear() {
        w.lock();
        try { m.clear(); }
        finally { w.unlock(); }
    }
 }

實現注意事項:

    此鎖最多支持 65535 個遞歸寫入鎖和 65535 個讀取鎖。試圖超出這些限制將致使鎖方法拋出 Error

 

嵌套類摘要

static class ReentrantReadWriteLock.ReadLock 
          readLock() 方法返回的鎖。
static class ReentrantReadWriteLock.WriteLock 
          writeLock() 方法返回的鎖。

構造方法摘要

ReentrantReadWriteLock() 
          使用默認(非公平)的排序屬性建立一個新的 ReentrantReadWriteLock
ReentrantReadWriteLock(boolean fair) 
          使用給定的公平策略建立一個新的 ReentrantReadWriteLock

方法摘要

protected  Thread getOwner() 
          返回當前擁有寫入鎖的線程,若是沒有這樣的線程,則返回 null
protected  Collection<Thread> getQueuedReaderThreads() 
          返回一個 collection,它包含可能正在等待獲取讀取鎖的線程。
protected  Collection<Thread> getQueuedThreads() 
          返回一個 collection,它包含可能正在等待獲取讀取或寫入鎖的線程。
protected  Collection<Thread> getQueuedWriterThreads() 
          返回一個 collection,它包含可能正在等待獲取寫入鎖的線程。
 int getQueueLength() 
          返回等待獲取讀取或寫入鎖的線程估計數目。
 int getReadHoldCount() 
          查詢當前線程在此鎖上保持的重入讀取鎖數量。
 int getReadLockCount() 
          查詢爲此鎖保持的讀取鎖數量。
protected  Collection<Thread> getWaitingThreads(Condition condition) 
          返回一個 collection,它包含可能正在等待與寫入鎖相關的給定條件的那些線程。
 int getWaitQueueLength(Condition condition) 
          返回正等待與寫入鎖相關的給定條件的線程估計數目。
 int getWriteHoldCount() 
          查詢當前線程在此鎖上保持的重入寫入鎖數量。
 boolean hasQueuedThread(Thread thread) 
          查詢是否給定線程正在等待獲取讀取或寫入鎖。
 boolean hasQueuedThreads() 
          查詢是否全部的線程正在等待獲取讀取或寫入鎖。
 boolean hasWaiters(Condition condition) 
          查詢是否有些線程正在等待與寫入鎖有關的給定條件。
 boolean isFair() 
          若是此鎖將公平性設置爲 ture,則返回 true
 boolean isWriteLocked() 
          查詢是否某個線程保持了寫入鎖。
 boolean isWriteLockedByCurrentThread() 
          查詢當前線程是否保持了寫入鎖。
 ReentrantReadWriteLock.ReadLock readLock() 
          返回用於讀取操做的鎖。
 String toString() 
          返回標識此鎖及其鎖狀態的字符串。
 ReentrantReadWriteLock.WriteLock writeLock() 
          返回用於寫入操做的鎖。

  

ReentrantReadWriteLock

public ReentrantReadWriteLock()

    使用默認(非公平)的排序屬性建立一個新的 ReentrantReadWriteLock

 

ReentrantReadWriteLock

public ReentrantReadWriteLock(boolean fair)

    使用給定的公平策略建立一個新的 ReentrantReadWriteLock

    參數:

    fair - 若是此鎖應該使用公平排序策略,則該參數的值爲 true

 

writeLock

public ReentrantReadWriteLock.WriteLock writeLock()

    返回用於寫入操做的鎖。

    實現了接口 ReadWriteLock 中的 writeLock方法

 

readLock

public ReentrantReadWriteLock.ReadLock readLock()

    返回用於讀取操做的鎖。

    實現了接口 ReadWriteLock 中的 readLock方法

 

isFair

public final boolean isFair()

    若是此鎖將公平性設置爲 ture,則返回 true

 

getOwner

protected Thread getOwner()

    返回當前擁有寫入鎖的線程,若是沒有這樣的線程,則返回 null。當經過不是全部者的線程調用此方法時,返回值反映當前鎖狀態的最接近近似值。例如,即便存在試圖得到鎖的線程,可是在它尚未得到前,全部者可能暫時爲 null。設計此方法是爲了便於構造提供更多擴展的鎖監視設施的子類。

    返回:

    全部者;若是沒有全部者,則返回 null

 

getReadLockCount

public int getReadLockCount()

    查詢爲此鎖保持的讀取鎖數量。此方法設計用於監視系統狀態,而不是同步控制。

    返回:

        所保持的讀取鎖數量。

 

isWriteLocked

public boolean isWriteLocked()

    查詢是否某個線程保持了寫入鎖。此方法設計用於監視系統狀態,而不是同步控制。

    返回:

        若是某個線程保持寫入鎖,則返回 true;不然返回 false

 

isWriteLockedByCurrentThread

public boolean isWriteLockedByCurrentThread()

    查詢當前線程是否保持了寫入鎖。

    返回:

        若是當前線程保持寫入鎖,則返回 true;不然返回 false

 

getWriteHoldCount

public int getWriteHoldCount()

    查詢當前線程在此鎖上保持的重入寫入鎖數量。對於與解除鎖操做不匹配的每一個鎖操做,writer 線程都會爲其保持一個鎖。

    返回:

        當前線程保持的寫入鎖數量,若是當前線程從未保持過寫入鎖,則返回 0

 

getReadHoldCount

public int getReadHoldCount()

    查詢當前線程在此鎖上保持的重入讀取鎖數量。對於與解除鎖操做不匹配的每一個鎖操做,reader 線程都會爲其保持一個鎖。

    返回:

        當前線程保持的讀取鎖數量;若是當前線程從未保持過讀取鎖,則返回 0

 

getQueuedWriterThreads

protected Collection<Thread> getQueuedWriterThreads()

    返回一個 collection,它包含可能正在等待獲取寫入鎖的線程。由於在構成此結果的同時,實際的線程 set 可能不斷髮生變化,因此返回的 collection 僅是盡力而爲得到的估計值。所返回 collection 的元素沒有特定的順序。設計此方法是爲了便於構造提供更多擴展的鎖監視器設施的子類。

    返回:

    線程的 collection

 

getQueuedReaderThreads

protected Collection<Thread> getQueuedReaderThreads()

    返回一個 collection,它包含可能正在等待獲取讀取鎖的線程。由於在構成此結果的同時,實際的線程 set 可能不斷髮生變化,因此返回的 collection 僅是盡力而爲得到的估計值。所返回 collection 的元素沒有特定的順序。設計此方法是爲了便於構造提供更多擴展的鎖監視器設施的子類。

    返回:

        線程的 collection

 

hasQueuedThreads

public final boolean hasQueuedThreads()

    查詢是否全部的線程正在等待獲取讀取或寫入鎖。注意,由於隨時可能發生取消操做,因此返回 true 並不保證任何其餘線程將獲取鎖。此方法主要用於監視系統狀態。

    返回:

        若是有其餘線程正等待獲取鎖,則返回 true

 

hasQueuedThread

public final boolean hasQueuedThread(Thread thread)

    查詢是否給定線程正在等待獲取讀取或寫入鎖。注意,由於隨時可能發生取消操做,因此返回 true 並不保證此線程將獲取鎖。此方法主要用於監視系統狀態。

    參數:

    thread - 線程

    返回:

        若是將給定的線程加入等待此鎖的隊列,則返回 true

    拋出:

    NullPointerException - 若是線程爲 null

 

getQueueLength

public final int getQueueLength()

    返回等待獲取讀取或寫入鎖的線程估計數目。由於在此方法遍歷內部數據結構時,能夠動態地更改線程數,因此該值只能是一個估計值。此方法設計用於監視系統狀態,而不是同步控制。

    返回:

        正在等待此鎖的線程估計數目

 

getQueuedThreads

protected Collection<Thread> getQueuedThreads()

    返回一個 collection,它包含可能正在等待獲取讀取或寫入鎖的線程。由於在構造此結果的同時,實際的線程 set 可能不斷髮生變化,因此返回的 collection 僅是盡力而爲得到的估計值。所返回 collection 中的元素沒有特定的順序。此方法用於加快子類的構造速度,提供更多的監視設施。

 

hasWaiters

public boolean hasWaiters(Condition condition)

    查詢是否有些線程正在等待與寫入鎖有關的給定條件。注意,由於隨時可能發生超時和中斷,因此返回 true 並不保證未來某個 signal 將喚醒任何線程。此方法主要用於監視系統狀態。

    參數:

    condition - 條件

    返回:

        若是有等待的線程,則返回 true

    拋出:

    IllegalMonitorStateException - 若是沒有保持此鎖

    IllegalArgumentException - 若是給定的條件與此鎖無關

    NullPointerException - 若是條件爲 null

 

getWaitQueueLength

public int getWaitQueueLength(Condition condition)

    返回正等待與寫入鎖相關的給定條件的線程估計數目。注意,由於隨時可能發生超時和中斷,因此只能將估計值做爲實際等待線程數的上限。此方法設計用於監視系統狀態,而不是同步控制。

    參數:

    condition - 條件

    返回:

        等待線程的估計數

    拋出:

    IllegalMonitorStateException - 若是沒有保持此鎖

    IllegalArgumentException - 若是給定的條件與此鎖無關

    NullPointerException - 若是條件爲 null

 

getWaitingThreads

protected Collection<Thread> getWaitingThreads(Condition condition)

    返回一個 collection,它包含可能正在等待與寫入鎖相關的給定條件的那些線程。由於在構造此結果的同時,實際的線程 set 可能不斷髮生變化,因此返回的 collection 僅是盡力而爲得到的估計值。所返回 collection 中的元素沒有特定的順序。此方法用於加快子類的構造速度,提供更多的條件監視設施。

    參數:

    condition - 條件

    返回:

        線程的 collection

    拋出:

    IllegalMonitorStateException - 若是沒有保持此鎖

    IllegalArgumentException - 若是給定 condition 與此鎖無關

    NullPointerException - 若是條件爲 null

 

toString

public String toString()

    返回標識此鎖及其鎖狀態的字符串。該狀態括在括號中,它包括字符串 "Write locks =",後跟重入保持寫入鎖的數目,而後是字符串 "Read locks =",後跟所保持讀取鎖的數目。

    覆蓋:

        類 Object 中的 toString

    返回:

        標識此鎖及其鎖狀態的字符串

 

實現原理

  ReentrantReadWriteLock 基於AQS實現的,它的自定義同步器Sync(繼承AQS)須要在同步狀態(一個整型變量state)上維護多個讀線程和一個寫線程的狀態,使得該狀態的設計成爲讀寫鎖實現的關鍵。

    若是在一個整型變量上維護多種狀態,就必定須要「按位切割」 SHARED_SHIFT 這個變量,讀寫鎖將變量切分紅了兩個部分,高16位表示讀,低16位表示寫。

      ReentrantReadWriteLock含有兩把鎖readerLock和writerLock,其中ReadLock和WriteLock都是內部類。

        寫鎖是一個可重入的獨佔鎖,使用AQS提供的獨佔式獲取同步狀態的策略。

        獲取寫鎖的步驟以下:

            1)判斷同步狀態state是否爲0。若是state!=0,說明已經有其餘線程獲取了讀鎖或寫鎖,執行2);不然執行5)。

            2)判斷同步狀態state的低16位(w)是否爲0。若是w=0,說明其餘線程獲取了讀鎖,返回false;若是w!=0,說明其餘線程獲取了寫鎖,執行步驟3)。

            3)判斷獲取了寫鎖是不是當前線程,若不是返回false,不然執行4);

            4)判斷當前線程獲取寫鎖的重入次數是否超過最大次數,若超過,拋異常,反之更新同步狀態+1,返回true。

            5)此時讀鎖或寫鎖都沒有被獲取,判斷是否須要阻塞(公平和非公平方式實現不一樣):若是不須要阻塞,則CAS更新同步狀態,若CAS成功則返回true,不然返回false;若是須要阻塞,則返回false。

        讀鎖是一個可重入的共享鎖,採用AQS提供的共享式獲取同步狀態的策略。

        獲取讀鎖的大體步驟以下:

            1)經過同步狀態低16位判斷,若是存在寫鎖且當前線程不是獲取寫鎖的線程,返回-1,獲取讀鎖失敗;不然執行步驟2)。

            2)經過readerShouldBlock判斷當前線程是否應該被阻塞,若是不該該阻塞則嘗試CAS同步狀態;不然執行3)。

            3)第一次獲取讀鎖失敗,經過fullTryAcquireShared再次嘗試獲取讀鎖。

 

用法示例Demo:

package com.thread;

import java.util.concurrent.locks.ReentrantReadWriteLock;

class ReadThread extends Thread {
    private ReentrantReadWriteLock rrwLock;

    public ReadThread(String name, ReentrantReadWriteLock rrwLock) {
        super(name);
        this.rrwLock = rrwLock;
    }

    public void run() {
        System.out.println(Thread.currentThread().getName() + " trying to lock");
        try {
            rrwLock.readLock().lock();
            System.out.println(Thread.currentThread().getName() + " lock successfully");
            System.out.println(Thread.currentThread().getName() +" 持有的讀取鎖重入次數:"+rrwLock.getReadHoldCount());
            System.out.println("持有讀取鎖的線程總數:"+rrwLock.getReadLockCount());
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            rrwLock.readLock().unlock();
            System.out.println(Thread.currentThread().getName() + " unlock successfully");
        }
    }
}

class WriteThread extends Thread {
    private ReentrantReadWriteLock rrwLock;

    public WriteThread(String name, ReentrantReadWriteLock rrwLock) {
        super(name);
        this.rrwLock = rrwLock;
    }

    public void run() {
        System.out.println(Thread.currentThread().getName() + " trying to lock");
        try {
            rrwLock.writeLock().lock();
            System.out.println(Thread.currentThread().getName() + " lock successfully");
            //寫鎖嘗試獲取讀鎖
            rrwLock.readLock().lock();
            System.out.println(Thread.currentThread().getName() +"寫鎖 獲取讀鎖成功");
            rrwLock.readLock().unlock();
            System.out.println(Thread.currentThread().getName() +"寫鎖 釋放持有的讀鎖完成");
            //降級
            rrwLock.readLock().lock();
            System.out.println(Thread.currentThread().getName() +"降級 獲取讀鎖成功");
        } finally {
            rrwLock.writeLock().unlock();
            System.out.println(Thread.currentThread().getName() + " unlock successfully");

            rrwLock.readLock().unlock();
            System.out.println(Thread.currentThread().getName() +"降級 釋放讀鎖完成");
        }
    }
}

public class ReentrantReadWriteLockDemo {
    public static void main(String[] args) {
        ReentrantReadWriteLock rrwLock = new ReentrantReadWriteLock();
        ReadThread rt1 = new ReadThread("rt1", rrwLock);
        ReadThread rt2 = new ReadThread("rt2", rrwLock);
        WriteThread wt1 = new WriteThread("wt1", rrwLock);
        rt1.start();
        rt2.start();
        wt1.start();
    }
}

    運行結果:

rt1 trying to lock
rt2 trying to lock

rt1 lock successfully
rt1 持有的讀取鎖重入次數:1

wt1 trying to lock
持有讀取鎖的線程總數:2
rt2 lock successfully
rt2 持有的讀取鎖重入次數:1
持有讀取鎖的線程總數:2

rt1 unlock successfully
rt2 unlock successfully

wt1 lock successfully wt1寫鎖 獲取讀鎖成功 wt1寫鎖 釋放持有的讀鎖完成 wt1降級 獲取讀鎖成功 wt1 unlock successfully wt1降級 釋放讀鎖完成

相關文章
相關標籤/搜索