深刻理解 ReentrantReadWriteLock

ReentrantReadWriteLock

ReentrantLock 是排它鎖,它在同一時刻只容許一個線程進行訪問。在不少場景中,讀服務遠多於寫服務,而讀服務之間不存在數據競爭問題,在一個線程讀數據時禁止其餘讀線程訪問,會致使性能下降。java

因此就有了讀寫鎖,它在同一時刻能夠容許多個讀線程訪問,但在寫線程訪問時,則全部的讀線程和其餘寫線程都會被阻塞。讀寫鎖內部維護了一個讀鎖和一個寫鎖,如此將讀寫鎖分離,能夠很大地提高併發性和吞吐量。編程

ReadWriteLock緩存

ReadWriteLock 接口定義了讀鎖和寫鎖的兩個方法:併發

public interface ReadWriteLock {
    Lock readLock();
    Lock writeLock();
}
複製代碼

其中 readLock() 方法用於返回讀操做的鎖,writeLock() 用於返回寫操做的鎖。app

實現類源碼分析

ReentrantReadWriteLock 實現了 ReadWriteLock 接口,它的幾個重要屬性以下:性能

// 內部類 ReadLock,讀鎖
private final ReentrantReadWriteLock.ReadLock readerLock;
// 內部類 WriteLock 寫鎖
private final ReentrantReadWriteLock.WriteLock writerLock;
// 同步器,讀寫和寫鎖依賴於它
final Sync sync;
複製代碼

其中有兩個構造方法,主要以下:ui

public ReentrantReadWriteLock() {
    this(false);
}
// 指定公平性
public ReentrantReadWriteLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
    readerLock = new ReadLock(this);
    writerLock = new WriteLock(this);
}
複製代碼
public static class ReadLock implements Lock, java.io.Serializable {
    private final Sync sync;
    protected ReadLock(ReentrantReadWriteLock lock) {
        sync = lock.sync;
    }
    ···
}

public static class WriteLock implements Lock, java.io.Serializable {
    private final Sync sync;
    protected WriteLock(ReentrantReadWriteLock lock) {
        sync = lock.sync;
    }
    ···
}
複製代碼

能夠看到,ReentrantReadWriteLock 鎖的主體依然是 Sync,讀鎖和寫鎖都依賴與 Sync 來實現,它們使用的是同一個鎖,只是在獲取鎖和釋放鎖的方式不一樣。this

讀寫狀態

ReentrantLock 中使用一個 int 型變量 state 來表示同步狀態,該值表示鎖被一個線程重複獲取的次數,而讀寫鎖中須要一個 int 型變量上維護多個讀線程和一個寫線程的狀態。spa

因此它將該變量分爲兩部分,高 16 位表示讀,低 16 位表示寫。分割以後經過位運算來計算讀鎖和寫鎖的狀態。

static final int SHARED_SHIFT   = 16;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

// 讀鎖狀態
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
// 寫鎖狀態
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
複製代碼

內部工做狀態

另外,ReentrantReadWriteLock 還提供了返回內部工做狀態的方法。

方法名 描述
getReadLockCount 返回讀鎖被獲取的次數(鎖重入次數也會加 1)
isWriteLocked 返回寫鎖是否被獲取
getWriteHoldCount 返回當前線程獲取寫鎖的次數
getReadHoldCount 返回當前線程獲取讀鎖的次數

前面三個方法都比較簡單:

final int getReadLockCount() {
    return sharedCount(getState()); // c >>> SHARED_SHIFT
}

final boolean isWriteLocked() {
    return exclusiveCount(getState()) != 0;
}

// 因爲寫鎖只會被一個線程獲取
// 因此,若是是當前線程,則經過 c & EXCLUSIVE_MASK 直接計算便可
final int getWriteHoldCount() {
    return isHeldExclusively() ? exclusiveCount(getState()) : 0;
}
複製代碼

最後一個方法,首先來看一下 Sync 類的幾個屬性:

// 當前線程持有的讀鎖數量
private transient ThreadLocalHoldCounter readHolds;

// HoldCounter 的一個緩存,減小 ThreadLocal.get 的次數
private transient HoldCounter cachedHoldCounter;
// 第一個獲取到讀鎖的讀線程
private transient Thread firstReader = null;
// 第一個讀線程持有的讀鎖數量
private transient int firstReaderHoldCount;
// 上面三個都是爲了提升效率,若是讀鎖僅有一個或有緩存了,就不用去 ThreadLocalHoldCounter 獲取

// 讀線程持有鎖的計數器,須要與線程綁定
static final class HoldCounter {
    int count = 0;
    // 持有線程 id,在釋放鎖時,判斷 cacheHoldCounter 緩存的是不是當前線程的讀鎖數量
    final long tid = getThreadId(Thread.currentThread());
}

// 經過 ThreadLocal 將 HoldCounter 綁定到線程上
static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {
    public HoldCounter initialValue() {
        return new HoldCounter();
    }
}

Sync() {
    readHolds = new ThreadLocalHoldCounter();
    setState(getState()); // ensures visibility of readHolds
}
複製代碼

getReadHoldCount() 方法用於獲取當前線程獲取讀鎖的次數。

final int getReadHoldCount() {
    // 若是讀鎖被獲取的次數爲 0,那麼當前線程獲取讀鎖的次數確定也爲 0
    if (getReadLockCount() == 0)
        return 0;

    Thread current = Thread.currentThread();
    // 若是當前線程是第一個獲取讀鎖的線程,則直接返回 firstReaderHoldCount
    if (firstReader == current)
        return firstReaderHoldCount;

    // 緩存的 HoldCounter 綁定的線程是不是當前線程,若是是則直接返回讀鎖數量
    HoldCounter rh = cachedHoldCounter;
    if (rh != null && rh.tid == getThreadId(current))
        return rh.count;

    // 不然從 ThreadLocalHoldCounter 中獲取 HoldCounter,再獲取讀鎖數量
    int count = readHolds.get().count;
    if (count == 0) readHolds.remove(); // 防止內存泄露
    return count;
}
複製代碼

寫鎖

寫鎖是一個支持可重入的排它鎖。

寫鎖的獲取

WriteLocklock() 方法以下,能夠看到,這裏調用的是 AQS 的獨佔式獲取鎖方法。

public void lock() {
    sync.acquire(1);
}

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    selfInterrupt();
}
複製代碼

在獲取寫鎖時,調用 AQSacquire 方法,其中又調用了 Sync 自定義組件實現的 tryAcquire 方法:

protected final boolean tryAcquire(int acquires) {

    Thread current = Thread.currentThread();
    int c = getState();
    int w = exclusiveCount(c); // 寫鎖個數
    if (c != 0) {
        // c != 0 && w == 0 表示有線程獲取了讀鎖
        // 或者當前線程不是持有鎖的線程,則失敗
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        // 若是寫鎖會超過範圍,拋出異常
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // 當前線程獲取寫鎖,可重入
        setState(c + acquires);
        return true;
    }
    // 若是沒有任何線程獲取讀鎖和寫鎖,當前線程嘗試獲取寫鎖
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}
複製代碼

這裏若是有線程獲取了讀鎖,則當前線程不能再獲取寫鎖。由於讀寫鎖須要確保獲取寫鎖的線程的操做對於讀鎖的線程是可見的,若是存在讀鎖時再容許獲取寫鎖,則獲取讀鎖的線程可能沒法得知當前獲取寫鎖的線程的操做。

判斷獲取寫鎖的線程是否應該被阻塞,公平鎖和非公平中實現不一樣。

static final class NonfairSync extends Sync {
    // 對於非公平鎖,直接返回 false
    final boolean writerShouldBlock() {
        return false;
    }
}

static final class FairSync extends Sync {
    // 對於公平鎖,則須要判斷是否有前驅節點
    final boolean writerShouldBlock() {
        return hasQueuedPredecessors();
    }
}

public final boolean hasQueuedPredecessors() {
    Node t = tail;
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}
複製代碼

寫鎖的釋放

unlock() 方法以下,其中調用了 AQSrelease 方法:

public void unlock() {
    sync.release(1);
}

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            // 喚醒後繼節點
            unparkSuccessor(h);
        return true;
    }
    return false;
}
複製代碼

release() 方法首先調用 Sync 中的 tryRelease() 方法,而後喚醒後繼節點:

protected final boolean tryRelease(int releases) {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    int nextc = getState() - releases;
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        setExclusiveOwnerThread(null);
    setState(nextc);
    return free;
}
複製代碼

該方法首先減小寫狀態值,若是寫狀態爲 0,則表示寫鎖已經被釋放,將持有鎖的線程設置爲 null,並更改同步狀態值。

讀鎖

讀鎖是一個支持可重入的共享鎖,它能被多個線程同時獲取。

讀鎖的獲取

ReadLocklock() 方法以下,其中調用了 AQS 的共享式獲取鎖方法:

public void lock() {
    sync.acquireShared(1);
}

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}
複製代碼

acquireShared 方法中,又調用了 SynctryAcquireShared 方法:

protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();
    int c = getState();
    // 存在寫鎖,而且寫鎖被其餘線程持有,則失敗
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    int r = sharedCount(c);
    // 獲取讀鎖的線程是否須要阻塞
    // 讀鎖小於 MAX_COUNT(1 << 16)
    // 使用 CAS 更新狀態爲 c + 1 << 16
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        if (r == 0) {
            // 沒有讀鎖
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
            // 讀鎖僅有一個
            firstReaderHoldCount++;
        } else {
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                //更新緩存
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                // 將 HoldCounter 設置到 ThreadLocal 中
                readHolds.set(rh);
            // 讀鎖數量加 1
            rh.count++;
        }
        return 1;
    }
    return fullTryAcquireShared(current);
}
複製代碼

該方法中,若是知足上述三個條件,則獲取讀鎖成功,會對 firstReaderHoldCount 等值進行設置,稍後詳細介紹。若是不知足時,會調用 fullTryAcquireShared 方法:

final int fullTryAcquireShared(Thread current) {
    HoldCounter rh = null;
    for (;;) {
        int c = getState();
        // 若是寫鎖不爲 0
        if (exclusiveCount(c) != 0) {
            // 當前線程不是持有寫鎖的線程,返回
            if (getExclusiveOwnerThread() != current)
                return -1;
        // 讀鎖是否須要被阻塞
        } else if (readerShouldBlock()) {
            // Make sure we're not acquiring read lock reentrantly
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
            } else {
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {
                        rh = readHolds.get();
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
                if (rh.count == 0)
                    return -1;
            }
        }
        // 讀鎖超出最大範圍
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // 使用 CAS 更新狀態值,嘗試獲取鎖
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}
複製代碼

判斷讀鎖是否應該被阻塞,公平鎖和非公平鎖實現不一樣,

static final class NonfairSync extends Sync {
    // 對於非公平鎖,須要判斷同步隊列中第一個結點是不是獨佔式(寫鎖)
    final boolean readerShouldBlock() {
        return apparentlyFirstQueuedIsExclusive();
    }
}
final boolean apparentlyFirstQueuedIsExclusive() {
    Node h, s;
    return (h = head) != null &&
        (s = h.next)  != null &&
        !s.isShared()         &&
        s.thread != null;
}

static final class FairSync extends Sync {
    // 對於公平鎖,須要判斷是否有前驅節點
    final boolean readerShouldBlock() {
        return hasQueuedPredecessors();
    }
}
複製代碼

讀鎖的釋放

ReadLockunlock 方法以下,其中調用的是 AQS 的共享式釋放鎖方法:

public void unlock() {
    sync.releaseShared(1);
}
    
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
複製代碼

releaseShared 方法中又調用了 SynctryReleaseShared 方法:

protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    // 若是當前線程是第一個獲取讀鎖的線程
    if (firstReader == current) {
        if (firstReaderHoldCount == 1)
            // 僅獲取了一次,將 firstReader 置爲 null
            firstReader = null;
        else
            // 不然將 firstReadHoldCount 減 1
            firstReaderHoldCount--;
    } else {
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            // 緩存若是有效,直接使用;不然從新獲取
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;
    }
    // 循環使用 CAS 更新狀態值
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}
複製代碼

鎖降級

ReentrantReadWriteLock 容許鎖降級,也就是寫鎖降級爲讀鎖。它是指先獲取寫鎖,再獲取到讀鎖,最後釋放寫鎖的過程。但鎖升級是不容許的,也就是先獲取讀鎖,再獲取寫鎖,最後釋放讀鎖的過程。

在獲取讀鎖的 tryAcquireShared 方法中:

protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();
    int c = getState();
    // 存在寫鎖,而且寫鎖被其餘線程持有,則失敗
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    int r = sharedCount(c);
    ···
}
複製代碼

若是存在寫鎖,而且寫鎖被其餘線程持有時,纔會失敗。說明若是當前線程持有了寫鎖,也能夠再獲取讀鎖。最後釋放寫鎖,這稱爲鎖降級。

爲什麼要這樣作呢?試想若是一個線程獲取了寫鎖,這個時候其餘任何線程都是沒法再獲取讀鎖或寫鎖的,而後該線程再去獲取讀鎖,也就不會產生任何的競爭。經過這種鎖降級機制,就不會有釋放寫鎖後,再去競爭獲取讀鎖的狀況,避免了鎖的競爭和線程的上下文切換,也就提升了效率。

參考資料

相關文章
相關標籤/搜索