深刻淺出ReentrantReadWriteLock源碼解析

讀寫鎖實現邏輯相對比較複雜,可是倒是一個常用到的功能,但願將我對ReentrantReadWriteLock的源碼的理解記錄下來,能夠對你們有幫助html

前提條件

在理解ReentrantReadWriteLock時須要具有一些基本的知識java

理解AQS的實現原理

以前有寫過一篇《深刻淺出AQS源碼解析》關於AQS的文章,對AQS原理不瞭解的同窗能夠先看一下緩存

理解ReentrantLock的實現原理

ReentrantLock的實現原理能夠參考《深刻淺出ReentrantLock源碼解析》函數

什麼是讀鎖和寫鎖

對於資源的訪問就兩種形式:要麼是讀操做,要麼是寫操做。讀寫鎖是將被鎖保護的臨界資源的讀操做和寫操做分開,容許同時有多個線程同時對臨界資源進行讀操做,任意時刻只容許一個線程對資源進行寫操做。簡單的說,對與讀操做採用的是共享鎖,對於寫操做採用的是排他鎖優化

讀寫狀態的設計

ReentrantReadWriteLock是用state字段來表示讀寫鎖重複獲取資源的次數,高16位用來標記讀鎖的同步狀態,低16位用來標記寫鎖的同步狀態ui

// 劃分的邊界線,用16位來劃分
static final int SHARED_SHIFT   = 16;
// 讀鎖的基本單位,也就是讀鎖加1或者減1的基本單位(1左移16位後的值)
static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
// 讀寫鎖的最大值(在計算讀鎖的時候須要先右移16位)
static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
// 寫鎖的掩碼,state值與掩碼作與運算後獲得寫鎖的真實值
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

// 獲取資源被讀鎖佔用的次數
static int sharedCount(int c){
      return c >>> SHARED_SHIFT;
}

// 獲取資源被寫鎖佔用的次數
static int exclusiveCount(int c){
      return c & EXCLUSIVE_MASK;
}

在統計讀鎖被每一個線程持有的次數時,ReentrantReadWriteLock採用的是HoldCounter來實現的,具體以下:線程

// 持有讀鎖的線程重入的次數
static final class HoldCounter {
    // 重入的次數
    int count = 0;
    // 持有讀鎖線程的線程id
    final long tid = getThreadId(Thread.currentThread());
}

/**
 * 採用ThreadLocal機制,作到線程之間的隔離
 */
static final class ThreadLocalHoldCounter
    extends ThreadLocal<HoldCounter> {
    public HoldCounter initialValue() {
        return new HoldCounter();
    }
}

/**
 * 線程持有可重入讀鎖的次數
 */
private transient ThreadLocalHoldCounter readHolds;

/**
 * 緩存最後一個成功獲取讀鎖的線程的重入次數,有兩方面的好處:
 * 一、避免了經過訪問ThreadLocal來獲取讀鎖的信息,這個優化的前提是
 *    假設多數狀況下,一個獲取讀鎖的線程,使用完之後就會釋放讀鎖,
 *    也就是說最後獲取讀鎖的線程和最早釋放讀鎖的線程大多數狀況下是同一個線程
 * 二、由於ThreadLocal中的key是一個弱引用類型,當有一個變量持有HoldCounter對象時,
 *    ThreadLocalHolderCounter中最後一個獲取鎖的線程信息不會被GC回收掉
 */
private transient HoldCounter cachedHoldCounter;

/**
 * 第一個獲取讀鎖的線程,有兩方面的考慮:
 * 一、記錄將共享數量從0變成1的線程
 * 二、對於無競爭的讀鎖來講進行線程重入次數數據的追蹤的成本是比較低的
 */
private transient Thread firstReader = null;

/**
 * 第一個獲取讀鎖線程的重入次數,能夠參考firstReader的解析
 */
private transient int firstReaderHoldCount;

ReentrantReadWriteLock 源碼解析

ReentrantReadWriteLock 一共有5個內部類,具體以下:設計

  • Sync:公平鎖和非公平鎖的抽象類
  • NonfairSync:非公平鎖的具體實現
  • FairSync:公平鎖的具體實現
  • ReadLock:讀鎖的具體實現
  • WriteLock:寫鎖的具體實現

咱們從讀鎖ReadLock和寫鎖WriteLock的源碼開始分析,而後順着這個思路將整個ReentrantReadWriteLock中全部的核心源碼(全部的包括內部類)進行分析。code

ReadLock類源碼解析

public static class ReadLock implements Lock, java.io.Serializable {
    
    private final Sync sync;

    /**
     * 經過ReentrantReadWriteLock中的公平鎖或非公平鎖來初始化sync變量
     */
    protected ReadLock(ReentrantReadWriteLock lock) {
          sync = lock.sync;
    }

    /**
     * 阻塞的方式獲取鎖,由於讀鎖是共享鎖,因此調用acquireShared方法
     */
    public void lock() {
          sync.acquireShared(1);
    }

    /**
     * 可中斷且阻塞的方式獲取鎖
     */
    public void lockInterruptibly() throws InterruptedException {
          sync.acquireSharedInterruptibly(1);
    }

    /**
     * 超時嘗試獲取鎖,非阻塞的方式
     */
    public boolean tryLock(long timeout, TimeUnit unit)
          throws InterruptedException {
          return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }
    /**
     * 嘗試獲取寫鎖,非阻塞的方式
     */
    public boolean tryLock() {
          return sync.tryReadLock();
    }

    /**
     * 釋放鎖
     */
    public void unlock() {
          sync.releaseShared(1);
    }
}

接下來,咱們重點看一下在公平鎖和非公平鎖下Sync.acquireSharedSync.releaseSharedSync.tryLock這3個方法的實現(acquireSharedInterruptiblytryAcquireSharedNanos是AQS中的方法,這裏就不在討論了,具體能夠參考《深刻淺出AQS源碼解析》),其中Sync.acquireShared中核心調用的方法是Sync.tryAcquireSharedSync. releaseShared中核心調用的方法是Sync.tryReleaseSharedSync.tryLock中核心調用的方法是Sync.tryReadLock,因此咱們重點分析Sync.tryAcquireShared方法、Sync.tryReleaseShared方法和sync.tryReadLock方法htm

Sync.tryAcquireShared方法

protected final int tryAcquireShared(int unused) {
    /**
     * 以共享鎖的方式嘗試獲取讀鎖,步驟以下:
     * 一、若是資源已經被寫鎖獲取了,直接返回失敗
     * 二、若是讀鎖不須要等待(公平鎖和非公平鎖的具體實現有區別)、
     *    而且讀鎖未超過上限、同時設置讀鎖的state值成功,則返回成功
     * 三、若是步驟2失敗了,須要進入fullTryAcquireShared函數再次嘗試獲取讀鎖
     */
    Thread current = Thread.currentThread();
    int c = getState();
    /**
     * 資源已經被寫鎖獨佔,直接返回false
     */
    if (exclusiveCount(c) != 0 &&
          getExclusiveOwnerThread() != current)
          return -1;
    int r = sharedCount(c);
    /**
     * 一、讀鎖不須要等待
     * 二、讀鎖未超過上限
     * 三、設置讀鎖的state值成功
     * 則返回成功
     */
    if (!readerShouldBlock() &&
          r < MAX_COUNT &&
          compareAndSetState(c, c + SHARED_UNIT)) {
          if (r == 0) {
            // 記錄第一個獲取讀鎖的線程信息
            firstReader = current;
            firstReaderHoldCount = 1;
          } else if (firstReader == current) {
            // 第一個獲取讀鎖的線程再次獲取鎖(重入)
            firstReaderHoldCount++;
          } else {
            // 修改獲取鎖的線程的重入的次數
            HoldCounter rh = cachedHoldCounter;
            if (rh == null ||
                rh.tid != LockSupport.getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
          }
          return 1;
    }
    /**
     * 若是CAS失敗再次獲取讀鎖
     */
    return fullTryAcquireShared(current);
}

接下來看一下fullTryAcquireShared方法:

final int fullTryAcquireShared(Thread current) {
    /**
     * 調用該方法的線程都是但願獲取讀鎖的線程,有3種狀況:
     * 一、在嘗試經過CAS操做修改state時因爲有多個競爭讀鎖的線程致使CAS操做失敗
     * 二、須要排隊等待獲取讀鎖的線程(公平鎖)
     * 三、超過讀鎖限制的最大申請次數的線程
     */
    HoldCounter rh = null;
    for (;;) { // 無限循環獲取鎖
        int c = getState();
        // 已經被寫線程獲取鎖了,直接返回
        if (exclusiveCount(c) != 0) {
            if (getExclusiveOwnerThread() != current)
                return -1;
        // 須要被block的讀線程(公平鎖)
        } else if (readerShouldBlock()) {
            // 若是時當前線程
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
            } else {
                // 清理當前線程中重入次數爲0的數據
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {
                        rh = readHolds.get();
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
                // 當前線程獲取鎖失敗
                if (rh.count == 0)
                    return -1;
            }
        }
        // 判斷是否超過讀鎖的最大值
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // 修改讀鎖的state值
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            // 最新獲取到讀鎖的線程設置相關的信息
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++; // 當前線程重複獲取鎖(重入)
            } else {
                // 在readHolds中記錄獲取鎖的線程的信息
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}

Sync.tryReleaseShared方法

tryReleaseShared方法的實現邏輯比較簡單,咱們直接看代碼中的註釋

protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    /**
     * 若是當前線程是第一個獲取讀鎖的線程,有兩種狀況:
     * 一、若是持有鎖的次數爲1,直接釋放成功
     * 二、若是持有鎖的次數大於1,說明有重入的狀況,須要次數減1
     */
    if (firstReader == current) {
        // assert firstReaderHoldCount > 0;
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
      /**
       * 若是當前線程不是第一個獲取讀鎖的線程
       * 須要更新線程持有鎖的重入次數
       * 若是次數小於等於0說明有異常,由於只有當前線程纔會出現持有鎖的重入次數等於0或者1
       */
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;
    }
    // 修改state的值
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            // 若是是最後一個釋放讀鎖的線程nextc爲0,不然不是
            return nextc == 0;
    }
}

sync.tryReadLock方法

tryReadLock的代碼比較簡單,就直接在將解析過程在註釋中描述

final boolean tryReadLock() {
    Thread current = Thread.currentThread();
    for (;;) { // 無限循環獲取讀鎖
        int c = getState();
        // 當前線程不是讀線程,直接返回失敗
        if (exclusiveCount(c) != 0 &&
            getExclusiveOwnerThread() != current)
            return false;
        int r = sharedCount(c);
        // 讀鎖的總重入次數是否超過最大次數限制
        if (r == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        /**
         * 經過CAS操做設置state的值,若是成功表示嘗試獲取讀鎖成功,須要作如下幾件事情:
         * 一、若是是第一獲取讀鎖要記錄第一個獲取讀鎖的線程信息
         * 二、若是是當前獲取鎖的線程和第一次獲取鎖的線程相同,須要更新第一獲取線程的重入次數
         * 三、更新獲取讀鎖線程相關的信息
         */
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (r == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
            }
            return true;
        }
    }
}

WriteLock類源碼解析

public static class WriteLock implements Lock, java.io.Serializable {
    private final Sync sync;

    /**
     * 經過ReentrantReadWriteLock中的公平鎖或非公平鎖來初始化sync變量
     */
    protected WriteLock(ReentrantReadWriteLock lock) {
        sync = lock.sync;
    }

    /**
     * 阻塞的方式獲取寫鎖
     */
    public void lock() {
        sync.acquire(1);
    }

    /**
     * 中斷的方式獲取寫鎖
     */
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    /**
     * 嘗試獲取寫鎖
     */
    public boolean tryLock( ) {
        return sync.tryWriteLock();
    }

    /**
     * 超時嘗試獲取寫鎖
     */
    public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }

    /**
     * 釋放寫鎖
     */
    public void unlock() {
        sync.release(1);
    }
}

接下來,咱們重點看一下在公平鎖和非公平鎖下Sync.tryAcquireSync.tryReleaseSync.tryWriteLock這幾個核心方法是如何實現寫鎖的功能

Sync.tryAcquire方法

Sync.tryAcquire方法的邏輯比較簡單,就直接在代碼中註釋,具體以下:

protected final boolean tryAcquire(int acquires) {
    Thread current = Thread.currentThread();
    // 讀寫鎖的次數
    int c = getState();
    // 寫鎖的次數
    int w = exclusiveCount(c);
    /*
     * 若是讀寫鎖的次數不爲0,說明鎖可能有如下3中狀況:
     * 一、所有是讀線程佔用資源
     * 2. 所有是寫線程佔用資源
     * 3. 讀寫線程都佔用了資源(鎖降級:持有寫鎖的線程能夠去持有讀鎖),可是讀寫線程都是同一個線程
     */
    if (c != 0) {
        // 寫線程不佔用資源,第一個獲取鎖的線程也不是當前線程,直接獲取失敗
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        // 檢查獲取寫鎖的線程是否超過了最大的重入次數
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // 修改state的狀態,之因此沒有用CAS操做來修改,是由於寫線程只有一個,是獨佔的
        setState(c + acquires);
        return true;
    }
    /*
     * 寫線程是第一個競爭鎖資源的線程
     * 若是寫線程須要等待(公平鎖的狀況),或者
     * 寫線程的state設置失敗,直接返回false
     */
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    // 設置當前線程爲owner
    setExclusiveOwnerThread(current);
    return true;
}

Sync.tryRelease方法

Sync.tryRelease方便的代碼很簡單,直接看代碼中的註釋

protected final boolean tryRelease(int releases) {
    // 若是釋放鎖的線程不持有鎖,返回失敗
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    // 獲取寫鎖的重入的次數
    int nextc = getState() - releases;
    // 若是次數爲0,須要釋放鎖的owner
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        setExclusiveOwnerThread(null); // 釋放鎖的owner
    setState(nextc);
    return free;
}

Sync.tryWriteLock方法

Sync.tryWriteLock這個方法也比較簡單,就直接上代碼了

final boolean tryWriteLock() {
    Thread current = Thread.currentThread();
    int c = getState();
    // 讀鎖或者寫鎖已經被線程持有
    if (c != 0) {
        int w = exclusiveCount(c);
        // 寫鎖第一次獲取鎖或者當前線程不是第一次獲取寫鎖的線程(也就是否是owner),直接失敗
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        // 超出寫鎖的最大次數,直接失敗
        if (w == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
    }
    // 競爭寫鎖的線程修改state,
    // 若是成功將本身設置成鎖的owner,
    // 若是失敗直接返回
    if (!compareAndSetState(c, c + 1))
        return false;
    setExclusiveOwnerThread(current); // 設置當前線程持有鎖
    return true;
}

總結

  • 讀鎖和寫鎖的佔用(重入)次數都是共用state字段,高位記錄讀鎖,地位記錄寫鎖,因此讀鎖和寫鎖的最大佔用次數爲2^16
  • 讀鎖和寫鎖都是可重入的
  • 讀鎖是共享鎖,容許多個線程獲取
  • 寫鎖是排他鎖,只容許一個線程獲取
  • 一個線程獲取了讀鎖,在非公平鎖的狀況下,其餘等待獲取讀鎖的線程均可以嘗試獲取讀鎖,在公平鎖的狀況下,按照AQS同步隊列的順利來獲取,若是隊列前面有一個等待寫鎖的線程在排隊,則後面全部等待獲取讀鎖的線程都將沒法獲取讀鎖
  • 獲取讀鎖的線程,不能再去申請獲取寫鎖
  • 一個獲取了寫鎖的線程,在持有鎖的時候能夠去申請獲取讀鎖,在釋放寫鎖之後,還會繼續持有讀鎖,這就是所謂的鎖降級
  • 讀鎖沒法升級爲寫鎖,緣由是獲取讀鎖的線程多是多個,而寫鎖是獨佔的,不能多個線程持有,也就是說不支持鎖升級
相關文章
相關標籤/搜索