【JUC】JDK1.8源碼分析之ReentrantReadWriteLock(七)

1、前言html

  在分析了鎖框架的其餘類以後,下面進入鎖框架中最後一個類ReentrantReadWriteLock的分析,它表示可重入讀寫鎖,ReentrantReadWriteLock中包含了兩種鎖,讀鎖ReadLock和寫鎖WriteLock,能夠經過這兩種鎖實現線程間的同步,下面開始進行分析。java

2、ReentrantReadWriteLock數據結構緩存

  分析源碼能夠知道,ReentrantReadWriteLock底層是基於ReentrantLockAbstractQueuedSynchronizer來實現的,因此,ReentrantReadWriteLock的數據結構也依託於AQS的數據結構,在前面對AQS的分析中已經指出了其數據結構,在這裏再也不累贅。數據結構

3、ReentrantReadWriteLock源碼分析併發

  3.1. 類的繼承關係  app

public class ReentrantReadWriteLock
        implements ReadWriteLock, java.io.Serializable {}

  說明:能夠看到,ReentrantReadWriteLock實現了ReadWriteLock接口,ReadWriteLock接口定義了獲取讀鎖和寫鎖的規範,具體須要實現類去實現;同時其還實現了Serializable接口,表示能夠進行序列化,在源代碼中能夠看到ReentrantReadWriteLock實現了本身的序列化邏輯。框架

  3.2. 類的內部類ide

  ReentrantReadWriteLock有五個內部類,五個內部類之間也是相互關聯的。內部類的關係以下圖所示。函數

  說明:如上圖所示,Sync繼承自AQS、NonfairSync繼承自Sync類、FairSync繼承自Sync類;ReadLock實現了Lock接口、WriteLock也實現了Lock接口。oop

  ① Sync類

  1. 類的繼承關係 

abstract static class Sync extends AbstractQueuedSynchronizer {}

  說明:Sync抽象類繼承自AQS抽象類,Sync類提供了對ReentrantReadWriteLock的支持。

  2. 類的內部類

  Sync類內部存在兩個內部類,分別爲HoldCounter和ThreadLocalHoldCounter,其中HoldCounter主要與讀鎖配套使用,其中,HoldCounter源碼以下。

        // 計數器
        static final class HoldCounter {
            // 計數
            int count = 0;
            // Use id, not reference, to avoid garbage retention
            // 獲取當前線程的TID屬性的值
            final long tid = getThreadId(Thread.currentThread());
        }
View Code

  說明:HoldCounter主要有兩個屬性,count和tid,其中count表示某個讀線程重入的次數,tid表示該線程的tid字段的值,該字段能夠用來惟一標識一個線程。ThreadLocalHoldCounter的源碼以下  

        // 本地線程計數器
        static final class ThreadLocalHoldCounter
            extends ThreadLocal<HoldCounter> {
            // 重寫初始化方法,在沒有進行set的狀況下,獲取的都是該HoldCounter值
            public HoldCounter initialValue() {
                return new HoldCounter();
            }
        }
View Code

  說明:ThreadLocalHoldCounter重寫了ThreadLocal的initialValue方法,ThreadLocal類能夠將線程與對象相關聯。在沒有進行set的狀況下,get到的均是initialValue方法裏面生成的那個HolderCounter對象。

  3. 類的屬性 

    abstract static class Sync extends AbstractQueuedSynchronizer {
        // 版本序列號
        private static final long serialVersionUID = 6317671515068378041L;        
        // 高16位爲讀鎖,低16位爲寫鎖
        static final int SHARED_SHIFT   = 16;
        // 讀鎖單位
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
        // 讀鎖最大數量
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
        // 寫鎖最大數量
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
        // 本地線程計數器
        private transient ThreadLocalHoldCounter readHolds;
        // 緩存的計數器
        private transient HoldCounter cachedHoldCounter;
        // 第一個讀線程
        private transient Thread firstReader = null;
        // 第一個讀線程的計數
        private transient int firstReaderHoldCount;
    }
View Code

  說明:該屬性中包括了讀鎖、寫鎖線程的最大量。本地線程計數器等。

  4. 類的構造函數  

        // 構造函數
        Sync() {
            // 本地線程計數器
            readHolds = new ThreadLocalHoldCounter();
            // 設置AQS的狀態
            setState(getState()); // ensures visibility of readHolds
        }
View Code

  說明:在Sync的構造函數中設置了本地線程計數器和AQS的狀態state。

  5. 核心函數分析

  對ReentrantReadWriteLock對象的操做絕大多數都轉發至Sync對象進行處理。下面對Sync類中的重點函數進行分析

  I. sharedCount函數

  表示佔有讀鎖的線程數量,源碼以下 

static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }

  說明:直接將state右移16位,就能夠獲得讀鎖的線程數量,由於state的高16位表示讀鎖,對應的第十六位表示寫鎖數量。

  II. exclusiveCount函數

  表示佔有寫鎖的線程數量,源碼以下  

static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

  說明:直接將狀態state和(2^16 - 1)作與運算,其等效於將state模上2^16。寫鎖數量由state的低十六位表示。

  III. tryRelease函數  

        protected final boolean tryAcquire(int acquires) {
            /*
             * Walkthrough:
             * 1. If read count nonzero or write count nonzero
             *    and owner is a different thread, fail.
             * 2. If count would saturate, fail. (This can only
             *    happen if count is already nonzero.)
             * 3. Otherwise, this thread is eligible for lock if
             *    it is either a reentrant acquire or
             *    queue policy allows it. If so, update state
             *    and set owner.
             */
            // 獲取當前線程
            Thread current = Thread.currentThread();
            // 獲取狀態
            int c = getState();
            // 寫線程數量
            int w = exclusiveCount(c);
            if (c != 0) { // 狀態不爲0
                // (Note: if c != 0 and w == 0 then shared count != 0)
                if (w == 0 || current != getExclusiveOwnerThread()) // 寫線程數量爲0或者當前線程沒有佔有獨佔資源
                    return false;
                if (w + exclusiveCount(acquires) > MAX_COUNT) // 判斷是否超過最高寫線程數量
                    throw new Error("Maximum lock count exceeded");
                // Reentrant acquire
                // 設置AQS狀態
                setState(c + acquires);
                return true;
            }
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires)) // 寫線程是否應該被阻塞
                return false;
            // 設置獨佔線程
            setExclusiveOwnerThread(current);
            return true;
        }
View Code

  說明:此函數用於釋放寫鎖資源,首先會判斷該線程是否爲獨佔線程,若不爲獨佔線程,則拋出異常,不然,計算釋放資源後的寫鎖的數量,若爲0,表示成功釋放,資源不將被佔用,不然,表示資源還被佔用。其函數流程圖以下。

  IV. tryAcquire函數  

        protected final boolean tryAcquire(int acquires) {
            /*
             * Walkthrough:
             * 1. If read count nonzero or write count nonzero
             *    and owner is a different thread, fail.
             * 2. If count would saturate, fail. (This can only
             *    happen if count is already nonzero.)
             * 3. Otherwise, this thread is eligible for lock if
             *    it is either a reentrant acquire or
             *    queue policy allows it. If so, update state
             *    and set owner.
             */
            // 獲取當前線程
            Thread current = Thread.currentThread();
            // 獲取狀態
            int c = getState();
            // 寫線程數量
            int w = exclusiveCount(c);
            if (c != 0) { // 狀態不爲0
                // (Note: if c != 0 and w == 0 then shared count != 0)
                if (w == 0 || current != getExclusiveOwnerThread()) // 寫線程數量爲0或者當前線程沒有佔有獨佔資源
                    return false;
                if (w + exclusiveCount(acquires) > MAX_COUNT) // 判斷是否超過最高寫線程數量
                    throw new Error("Maximum lock count exceeded");
                // Reentrant acquire
                // 設置AQS狀態
                setState(c + acquires);
                return true;
            }
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires)) // 寫線程是否應該被阻塞
                return false;
            // 設置獨佔線程
            setExclusiveOwnerThread(current);
            return true;
        }
View Code

  說明:此函數用於獲取寫鎖,首先會獲取state,判斷是否爲0,若爲0,表示此時沒有讀鎖線程,再判斷寫線程是否應該被阻塞,而在非公平策略下老是不會被阻塞,在公平策略下會進行判斷(判斷同步隊列中是否有等待時間更長的線程,若存在,則須要被阻塞,不然,無需阻塞),以後在設置狀態state,而後返回true。若state不爲0,則表示此時存在讀鎖或寫鎖線程,若寫鎖線程數量爲0或者當前線程爲獨佔鎖線程,則返回false,表示不成功,不然,判斷寫鎖線程的重入次數是否大於了最大值,如果,則拋出異常,不然,設置狀態state,返回true,表示成功。其函數流程圖以下

  V. tryReleaseShared函數 

        protected final boolean tryReleaseShared(int unused) {
            // 獲取當前線程
            Thread current = Thread.currentThread();
            if (firstReader == current) { // 當前線程爲第一個讀線程
                // assert firstReaderHoldCount > 0;
                if (firstReaderHoldCount == 1) // 讀線程佔用的資源數爲1
                    firstReader = null;
                else // 減小佔用的資源
                    firstReaderHoldCount--;
            } else { // 當前線程不爲第一個讀線程
                // 獲取緩存的計數器
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current)) // 計數器爲空或者計數器的tid不爲當前正在運行的線程的tid
                    // 獲取當前線程對應的計數器
                    rh = readHolds.get();
                // 獲取計數
                int count = rh.count;
                if (count <= 1) { // 計數小於等於1
                    // 移除
                    readHolds.remove();
                    if (count <= 0) // 計數小於等於0,拋出異常
                        throw unmatchedUnlockException();
                }
                // 減小計數
                --rh.count;
            }
            for (;;) { // 無限循環
                // 獲取狀態
                int c = getState();
                // 獲取狀態
                int nextc = c - SHARED_UNIT;
                if (compareAndSetState(c, nextc)) // 比較並進行設置
                    // Releasing the read lock has no effect on readers,
                    // but it may allow waiting writers to proceed if
                    // both read and write locks are now free.
                    return nextc == 0;
            }
        }
View Code

  說明:此函數表示讀鎖線程釋放鎖。首先判斷當前線程是否爲第一個讀線程firstReader,如果,則判斷第一個讀線程佔有的資源數firstReaderHoldCount是否爲1,如果,則設置第一個讀線程firstReader爲空,不然,將第一個讀線程佔有的資源數firstReaderHoldCount減1;若當前線程不是第一個讀線程,那麼首先會獲取緩存計數器(上一個讀鎖線程對應的計數器 ),若計數器爲空或者tid不等於當前線程的tid值,則獲取當前線程的計數器,若是計數器的計數count小於等於1,則移除當前線程對應的計數器,若是計數器的計數count小於等於0,則拋出異常,以後再減小計數便可。不管何種狀況,都會進入無限循環,該循環能夠確保成功設置狀態state。其流程圖以下

  VI. tryAcquireShared函數 

        private IllegalMonitorStateException unmatchedUnlockException() {
            return new IllegalMonitorStateException(
                "attempt to unlock read lock, not locked by current thread");
        }
        
        // 共享模式下獲取資源
        protected final int tryAcquireShared(int unused) {
            /*
             * Walkthrough:
             * 1. If write lock held by another thread, fail.
             * 2. Otherwise, this thread is eligible for
             *    lock wrt state, so ask if it should block
             *    because of queue policy. If not, try
             *    to grant by CASing state and updating count.
             *    Note that step does not check for reentrant
             *    acquires, which is postponed to full version
             *    to avoid having to check hold count in
             *    the more typical non-reentrant case.
             * 3. If step 2 fails either because thread
             *    apparently not eligible or CAS fails or count
             *    saturated, chain to version with full retry loop.
             */
            // 獲取當前線程
            Thread current = Thread.currentThread();
            // 獲取狀態
            int c = getState();
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current) // 寫線程數不爲0而且佔有資源的不是當前線程
                return -1;
            // 讀鎖數量
            int r = sharedCount(c);
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) { // 讀線程是否應該被阻塞、而且小於最大值、而且比較設置成功
                if (r == 0) { // 讀鎖數量爲0
                    // 設置第一個讀線程
                    firstReader = current;
                    // 讀線程佔用的資源數爲1
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) { // 當前線程爲第一個讀線程
                    // 佔用資源數加1
                    firstReaderHoldCount++;
                } else { // 讀鎖數量不爲0而且不爲當前線程
                    // 獲取計數器
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) // 計數器爲空或者計數器的tid不爲當前正在運行的線程的tid
                        // 獲取當前線程對應的計數器
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0) // 計數爲0
                        // 設置
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            return fullTryAcquireShared(current);
        }
View Code

  說明:此函數表示讀鎖線程獲取讀鎖。首先判斷寫鎖是否爲0而且當前線程不佔有獨佔鎖,直接返回;不然,判斷讀線程是否須要被阻塞而且讀鎖數量是否小於最大值而且比較設置狀態成功,若當前沒有讀鎖,則設置第一個讀線程firstReader和firstReaderHoldCount;若當前線程線程爲第一個讀線程,則增長firstReaderHoldCount;不然,將設置當前線程對應的HoldCounter對象的值。流程圖以下。


  VII. fullTryAcquireShared函數 

        final int fullTryAcquireShared(Thread current) {
            /*
             * This code is in part redundant with that in
             * tryAcquireShared but is simpler overall by not
             * complicating tryAcquireShared with interactions between
             * retries and lazily reading hold counts.
             */
            HoldCounter rh = null;
            for (;;) { // 無限循環
                // 獲取狀態
                int c = getState();
                if (exclusiveCount(c) != 0) { // 寫線程數量不爲0
                    if (getExclusiveOwnerThread() != current) // 不爲當前線程
                        return -1;
                    // else we hold the exclusive lock; blocking here
                    // would cause deadlock.
                } else if (readerShouldBlock()) { // 寫線程數量爲0而且讀線程被阻塞
                    // Make sure we're not acquiring read lock reentrantly
                    if (firstReader == current) { // 當前線程爲第一個讀線程
                        // assert firstReaderHoldCount > 0;
                    } else { // 當前線程不爲第一個讀線程
                        if (rh == null) { // 計數器不爲空
                            // 
                            rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current)) { // 計數器爲空或者計數器的tid不爲當前正在運行的線程的tid
                                rh = readHolds.get();
                                if (rh.count == 0)
                                    readHolds.remove();
                            }
                        }
                        if (rh.count == 0)
                            return -1;
                    }
                }
                if (sharedCount(c) == MAX_COUNT) // 讀鎖數量爲最大值,拋出異常
                    throw new Error("Maximum lock count exceeded");
                if (compareAndSetState(c, c + SHARED_UNIT)) { // 比較而且設置成功
                    if (sharedCount(c) == 0) { // 讀線程數量爲0
                        // 設置第一個讀線程
                        firstReader = current;
                        // 
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        if (rh == null)
                            rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                        cachedHoldCounter = rh; // cache for release
                    }
                    return 1;
                }
            }
        }
View Code

  說明:在tryAcquireShared函數中,若是下列三個條件不知足(讀線程是否應該被阻塞、小於最大值、比較設置成功)則會進行fullTryAcquireShared函數中,它用來保證相關操做能夠成功。其邏輯與tryAcquireShared邏輯相似,再也不累贅。

  而其餘內部類的操做基本上都是轉化到了對Sync對象的操做,在此再也不累贅。

  3.3. 類的屬性  

public class ReentrantReadWriteLock
        implements ReadWriteLock, java.io.Serializable {
    // 版本序列號    
    private static final long serialVersionUID = -6992448646407690164L;    
    // 讀鎖
    private final ReentrantReadWriteLock.ReadLock readerLock;
    // 寫鎖
    private final ReentrantReadWriteLock.WriteLock writerLock;
    // 同步隊列
    final Sync sync;
    
    private static final sun.misc.Unsafe UNSAFE;
    // 線程ID的偏移地址
    private static final long TID_OFFSET;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> tk = Thread.class;
            // 獲取線程的tid字段的內存地址
            TID_OFFSET = UNSAFE.objectFieldOffset
                (tk.getDeclaredField("tid"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}
View Code

  說明:能夠看到ReentrantReadWriteLock屬性包括了一個ReentrantReadWriteLock.ReadLock對象,表示讀鎖;一個ReentrantReadWriteLock.WriteLock對象,表示寫鎖;一個Sync對象,表示同步隊列。

  3.4. 類的構造函數

  1. ReentrantReadWriteLock()型構造函數  

    public ReentrantReadWriteLock() {
        this(false);
    }
View Code

  說明:此構造函數會調用另一個有參構造函數。

  2. ReentrantReadWriteLock(boolean)型構造函數 

    public ReentrantReadWriteLock(boolean fair) {
        // 公平策略或者是非公平策略
        sync = fair ? new FairSync() : new NonfairSync();
        // 讀鎖
        readerLock = new ReadLock(this);
        // 寫鎖
        writerLock = new WriteLock(this);
    }
View Code

  說明:能夠指定設置公平策略或者非公平策略,而且該構造函數中生成了讀鎖與寫鎖兩個對象。

  3.5 核心函數分析

  對ReentrantReadWriteLock的操做基本上都轉化爲了對Sync對象的操做,而Sync的函數已經分析過,再也不累贅。

4、示例

  下面給出了一個使用ReentrantReadWriteLock的示例,源代碼以下。

package com.hust.grid.leesf.reentrantreadwritelock;

import java.util.concurrent.locks.ReentrantReadWriteLock;

class ReadThread extends Thread {
    private ReentrantReadWriteLock rrwLock;
    
    public ReadThread(String name, ReentrantReadWriteLock rrwLock) {
        super(name);
        this.rrwLock = rrwLock;
    }
    
    public void run() {
        System.out.println(Thread.currentThread().getName() + " trying to lock");
        try {
            rrwLock.readLock().lock();
            System.out.println(Thread.currentThread().getName() + " lock successfully");
            Thread.sleep(5000);        
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            rrwLock.readLock().unlock();
            System.out.println(Thread.currentThread().getName() + " unlock successfully");
        }
    }
}

class WriteThread extends Thread {
    private ReentrantReadWriteLock rrwLock;
    
    public WriteThread(String name, ReentrantReadWriteLock rrwLock) {
        super(name);
        this.rrwLock = rrwLock;
    }
    
    public void run() {
        System.out.println(Thread.currentThread().getName() + " trying to lock");
        try {
            rrwLock.writeLock().lock();
            System.out.println(Thread.currentThread().getName() + " lock successfully");    
        } finally {
            rrwLock.writeLock().unlock();
            System.out.println(Thread.currentThread().getName() + " unlock successfully");
        }
    }
}

public class ReentrantReadWriteLockDemo {
    public static void main(String[] args) {
        ReentrantReadWriteLock rrwLock = new ReentrantReadWriteLock();
        ReadThread rt1 = new ReadThread("rt1", rrwLock);
        ReadThread rt2 = new ReadThread("rt2", rrwLock);
        WriteThread wt1 = new WriteThread("wt1", rrwLock);
        rt1.start();
        rt2.start();
        wt1.start();
    } 
}
View Code

  運行結果(某一次):  

rt1 trying to lock
rt2 trying to lock
wt1 trying to lock
rt1 lock successfully
rt2 lock successfully
rt1 unlock successfully
rt2 unlock successfully
wt1 lock successfully
wt1 unlock successfully

  說明:程序中生成了一個ReentrantReadWriteLock對象,而且設置了兩個讀線程,一個寫線程。根據結果,可能存在以下的時序圖。

  ① rt1線程執行rrwLock.readLock().lock操做,主要的函數調用以下。

  說明:此時,AQS的狀態state爲2^16 次方,即表示此時讀線程數量爲1。

  ② rt2線程執行rrwLock.readLock().lock操做,主要的函數調用以下。

  說明:此時,AQS的狀態state爲2 * 2^16次方,即表示此時讀線程數量爲2。

  ③ wt1線程執行rrwLock.writeLock().lock操做,主要的函數調用以下。

  說明:此時,在同步隊列Sync queue中存在兩個結點,而且wt1線程會被禁止運行。

  ④ rt1線程執行rrwLock.readLock().unlock操做,主要的函數調用以下。

  說明:此時,AQS的state爲2^16次方,表示還有一個讀線程。

  ⑤ rt2線程執行rrwLock.readLock().unlock操做,主要的函數調用以下。

  說明:當rt2線程執行unlock操做後,AQS的state爲0,而且wt1線程將會被unpark,其得到CPU資源就能夠運行。

  ⑥ wt1線程得到CPU資源,繼續運行,須要恢復。因爲以前acquireQueued函數中的parkAndCheckInterrupt函數中被禁止的,因此,恢復到parkAndCheckInterrupt函數中,主要的函數調用以下

  說明:最後,sync queue隊列中只有一個結點,而且頭結點尾節點均指向它,AQS的state值爲1,表示此時有一個寫線程。

  ⑦ wt1執行rrwLock.writeLock().unlock操做,主要的函數調用以下。

  說明:此時,AQS的state爲0,表示沒有任何讀線程或者寫線程了。而且Sync queue結構與上一個狀態的結構相同,沒有變化。

5、總結

  通過分析ReentrantReadWriteLock的源碼,可知其能夠實現多個線程同時讀,此時,寫線程會被阻塞。而且,寫線程獲取寫入鎖後能夠獲取讀取鎖,而後釋放寫入鎖,這樣寫入鎖變成了讀取鎖。至此,併發框架中的鎖框架就已經所有介紹完成了,經過分析源碼,有了很多收穫,謝謝各位園友的觀看~

相關文章
相關標籤/搜索