Java讀寫鎖的原理實現

Java讀寫鎖的原理實現

 

最近作的一個小項目中有這樣的需求:java

整個項目有一份 config.json 保存着項目的一些配置,是存儲在本地文件的一個資源,而且應用中存在讀寫(讀>>寫)更新問題。既然讀寫併發操做,那麼就涉及到操做互斥,這裏天然想到了讀寫鎖,也順便對本身讀寫鎖方面的知識作個梳理。編程

爲何須要讀寫鎖?json

與傳統鎖不一樣的是讀寫鎖的規則是能夠共享讀,但只能一個寫,總結起來爲: 讀讀不互斥,讀寫互斥,寫寫互斥 ,而通常的獨佔鎖是: 讀讀互斥,讀寫互斥,寫寫互斥 ,而場景中每每 讀遠遠大於寫 ,讀寫鎖就是爲了這種優化而建立出來的一種機制。併發

注意是 讀遠遠大於寫 ,通常狀況下獨佔鎖的效率低來源於高併發下對臨界區的激烈競爭致使線程上下文切換。所以當併發不是很高的狀況下,讀寫鎖因爲須要額外維護讀鎖的狀態,可能還不如獨佔鎖的效率高。所以須要根據實際狀況選擇使用。app

一個簡單的讀寫鎖實現函數

根據上面理論能夠利用兩個int變量來簡單實現一個讀寫鎖,實現雖然爛,可是原理都是差很少的,值得閱讀下。高併發

public class ReadWriteLock {
 /**
 * 讀鎖持有個數
 */
 private int readCount = 0;
 /**
 * 寫鎖持有個數
 */
 private int writeCount = 0;
 /**
 * 獲取讀鎖,讀鎖在寫鎖不存在的時候才能獲取
 */
 public synchronized void lockRead() throws InterruptedException {
 // 寫鎖存在,須要wait
 while (writeCount > 0) {
 wait();
 }
 readCount++;
 }
 /**
 * 釋放讀鎖
 */
 public synchronized void unlockRead() {
 readCount--;
 notifyAll();
 }
 /**
 * 獲取寫鎖,當讀鎖存在時須要wait.
 */
 public synchronized void lockWrite() throws InterruptedException {
 // 先判斷是否有寫請求
 while (writeCount > 0) {
 wait();
 }
 // 此時已經不存在獲取寫鎖的線程了,所以佔坑,防止寫鎖飢餓
 writeCount++;
 // 讀鎖爲0時獲取寫鎖
 while (readCount > 0) {
 wait();
 }
 }
 /**
 * 釋放讀鎖
 */
 public synchronized void unlockWrite() {
 writeCount--;
 notifyAll();
 }
}

ReadWriteLock的實現原理優化

在Java中 ReadWriteLock 的主要實現爲 ReentrantReadWriteLock ,其提供瞭如下特性:ui

  1. 公平性選擇:支持公平與非公平(默認)的鎖獲取方式,吞吐量非公平優先於公平。
  2. 可重入:讀線程獲取讀鎖以後能夠再次獲取讀鎖,寫線程獲取寫鎖以後能夠再次獲取寫鎖
  3. 可降級:寫線程獲取寫鎖以後,其還能夠再次獲取讀鎖,而後釋放掉寫鎖,那麼此時該線程是讀鎖狀態,也就是降級操做。

ReentrantReadWriteLock的結構this

ReentrantReadWriteLock 的核心是由一個基於AQS的同步器 Sync 構成,而後由其擴展出 ReadLock (共享鎖), WriteLock (排它鎖)所組成。

Java讀寫鎖的原理實現

 

而且從 ReentrantReadWriteLock 的構造函數中能夠發現 ReadLock 與 WriteLock 使用的是同一個Sync,具體怎麼實現同一個隊列既能夠爲共享鎖,又能夠表示排他鎖下文會具體分析。

清單一:ReentrantReadWriteLock構造函數

public ReentrantReadWriteLock(boolean fair) {
 sync = fair ? new FairSync() : new NonfairSync();
 readerLock = new ReadLock(this);
 writerLock = new WriteLock(this);
 }

Sync的實現

sync 是讀寫鎖實現的核心, sync 是基於AQS實現的,在AQS中核心是state字段和雙端隊列,那麼一個一個問題來分析。

Sync是如何同時表示讀鎖與寫鎖?

清單2:讀寫鎖狀態獲取

static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** Returns the number of shared holds represented in count */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

從代碼中獲取讀寫狀態能夠看出其是把 state(int32位) 字段分紅高16位與低16位,其中高16位表示讀鎖個數,低16位表示寫鎖個數,以下圖所示(圖來自 Java併發編程藝術 )。

Java讀寫鎖的原理實現

 

該圖表示當前一個線程獲取到了寫鎖,而且重入了兩次,所以低16位是3,而且該線程又獲取了讀鎖,而且重入了一次,因此高16位是2,當寫鎖被獲取時若是讀鎖不爲0那麼讀鎖必定是獲取寫鎖的這個線程。

讀鎖的獲取

讀鎖的獲取主要實現是AQS中的 acquireShared 方法,其調用過程以下代碼。

清單3:讀鎖獲取入口

// ReadLock
public void lock() {
 sync.acquireShared(1);
}
// AQS
public final void acquireShared(int arg) {
 if (tryAcquireShared(arg) < 0)
 doAcquireShared(arg);
}

其中 doAcquireShared(arg) 方法是獲取失敗以後AQS中入隊操做,等待被喚醒後從新獲取,那麼關鍵點就是 tryAcquireShared(arg) 方法,方法有點長,所以先總結出獲取讀鎖所經歷的步驟,獲取的第一部分步驟以下:

  • 操做1:讀寫須要互斥,所以當存在寫鎖而且持有寫鎖的線程不是該線程時獲取失敗。
  • 操做2:是否存在等待寫鎖的線程,存在的話則獲取讀鎖須要等待,避免寫鎖飢餓。(寫鎖優先級是比較高的)
  • 操做3:CAS獲取讀鎖,其實是state字段的高16位自增。
  • 操做4:獲取成功後再ThreadLocal中記錄當前線程獲取讀鎖的次數。

清單4:讀鎖獲取的第一部分

protected final int tryAcquireShared(int unused) {
 Thread current = Thread.currentThread();
 int c = getState();
 // 操做1:存在寫鎖,而且寫鎖不是當前線程則直接去排隊
 if (exclusiveCount(c) != 0 &&
 getExclusiveOwnerThread() != current)
 return -1;
 int r = sharedCount(c);
 // 操做2:讀鎖是否該阻塞,對於非公平模式下寫鎖獲取優先級會高,若是存在要獲取寫鎖的線程則讀鎖須要讓步,公平模式下則先來先到
 if (!readerShouldBlock() && 
 // 讀鎖使用高16位,所以存在獲取上限爲2^16-1
 r < MAX_COUNT &&
 // 操做3:CAS修改讀鎖狀態,其實是讀鎖狀態+1
 compareAndSetState(c, c + SHARED_UNIT)) {
 // 操做4:執行到這裏說明讀鎖已經獲取成功,所以須要記錄線程狀態。
 if (r == 0) {
 firstReader = current; // firstReader是把讀鎖狀態從0變成1的那個線程
 firstReaderHoldCount = 1;
 } else if (firstReader == current) { 
 firstReaderHoldCount++;
 } else {
 // 這些代碼其實是從ThreadLocal中獲取當前線程重入讀鎖的次數,而後自增下。
 HoldCounter rh = cachedHoldCounter; // cachedHoldCounter是上一個獲取鎖成功的線程
 if (rh == null || rh.tid != getThreadId(current))
 cachedHoldCounter = rh = readHolds.get();
 else if (rh.count == 0)
 readHolds.set(rh);
 rh.count++;
 }
 return 1;
 }
 // 當操做2,操做3失敗時執行該邏輯
 return fullTryAcquireShared(current);
 }

當操做2,操做3失敗時會執行 fullTryAcquireShared(current) ,爲何會這樣寫呢?我的認爲是一種補償操做, 操做2與操做3失敗並不表明當前線程沒有讀鎖的資格 ,而且這裏的讀鎖是共享鎖,有資格就應該被獲取成功,所以給予補償獲取讀鎖的操做。在 fullTryAcquireShared(current) 中是一個循環獲取讀鎖的過程,大體步驟以下:

  • 操做5:等同於操做2,存在寫鎖,且寫鎖線程並不是當前線程則直接返回失敗
  • 操做6:當前線程是重入讀鎖,這裏只會偏向第一個獲取讀鎖的線程以及最後一個獲取讀鎖的線程,其餘都須要去AQS中排隊。
  • 操做7:CAS改變讀鎖狀態
  • 操做8:同操做4,獲取成功後再ThreadLocal中記錄當前線程獲取讀鎖的次數。

清單5:讀鎖獲取的第二部分

final int fullTryAcquireShared(Thread current) {
 HoldCounter rh = null;
 // 最外層嵌套循環
 for (;;) {
 int c = getState();
 // 操做5:存在寫鎖,且寫鎖並不是當前線程則直接返回失敗
 if (exclusiveCount(c) != 0) {
 if (getExclusiveOwnerThread() != current)
 return -1;
 // else we hold the exclusive lock; blocking here
 // would cause deadlock.
 // 操做6:若是當前線程是重入讀鎖則放行
 } else if (readerShouldBlock()) {
 // Make sure we're not acquiring read lock reentrantly
 // 當前是firstReader,則直接放行,說明是已獲取的線程重入讀鎖
 if (firstReader == current) {
 // assert firstReaderHoldCount > 0;
 } else {
 // 執行到這裏說明是其餘線程,若是是cachedHoldCounter(其count不爲0)也就是上一個獲取鎖的線程則能夠重入,不然進入AQS中排隊
 // **這裏也是對寫鎖的讓步**,若是隊列中頭結點爲寫鎖,那麼當前獲取讀鎖的線程要進入隊列中排隊
 if (rh == null) {
 rh = cachedHoldCounter;
 if (rh == null || rh.tid != getThreadId(current)) {
 rh = readHolds.get();
 if (rh.count == 0)
 readHolds.remove();
 }
 }
 // 說明是上述剛初始化的rh,因此直接去AQS中排隊
 if (rh.count == 0)
 return -1;
 }
 }
 if (sharedCount(c) == MAX_COUNT)
 throw new Error("Maximum lock count exceeded");
 // 操做7:修改讀鎖狀態,實際上讀鎖自增操做
 if (compareAndSetState(c, c + SHARED_UNIT)) {
 // 操做8:對ThreadLocal中維護的獲取鎖次數進行更新。
 if (sharedCount(c) == 0) {
 firstReader = current;
 firstReaderHoldCount = 1;
 } else if (firstReader == current) {
 firstReaderHoldCount++;
 } else {
 if (rh == null)
 rh = cachedHoldCounter;
 if (rh == null || rh.tid != getThreadId(current))
 rh = readHolds.get();
 else if (rh.count == 0)
 readHolds.set(rh);
 rh.count++;
 cachedHoldCounter = rh; // cache for release
 }
 return 1;
 }
 }
 }

讀鎖的釋放

清單6:讀鎖釋放入口

// ReadLock
public void unlock() {
 sync.releaseShared(1);
}
// Sync
public final boolean releaseShared(int arg) {
 if (tryReleaseShared(arg)) {
 doReleaseShared(); // 這裏其實是釋放讀鎖後喚醒寫鎖的線程操做
 return true;
 }
 return false;
}

讀鎖的釋放主要是 tryReleaseShared(arg) 函數,所以拆解其步驟以下:

  • 操做1:清理ThreadLocal中保存的獲取鎖數量信息
  • 操做2:CAS修改讀鎖個數,其實是自減一

清單7:讀鎖的釋放流程

protected final boolean tryReleaseShared(int unused) {
 Thread current = Thread.currentThread();
 // 操做1:清理ThreadLocal對應的信息
 if (firstReader == current) {;
 if (firstReaderHoldCount == 1)
 firstReader = null;
 else
 firstReaderHoldCount--;
 } else {
 HoldCounter rh = cachedHoldCounter;
 if (rh == null || rh.tid != getThreadId(current))
 rh = readHolds.get();
 int count = rh.count;
 // 已釋放完的讀鎖的線程清空操做
 if (count <= 1) {
 readHolds.remove();
 // 若是沒有獲取鎖卻釋放則會報該錯誤
 if (count <= 0)
 throw unmatchedUnlockException();
 }
 --rh.count;
 }
 // 操做2:循環中利用CAS修改讀鎖狀態
 for (;;) {
 int c = getState();
 int nextc = c - SHARED_UNIT;
 if (compareAndSetState(c, nextc))
 // Releasing the read lock has no effect on readers,
 // but it may allow waiting writers to proceed if
 // both read and write locks are now free.
 return nextc == 0;
 }
 }

寫鎖的獲取

清單8:寫鎖的獲取入口

// WriteLock
 public void lock() {
 sync.acquire(1);
 }
// AQS
 public final void acquire(int arg) {
 // 嘗試獲取,獲取失敗後入隊,入隊失敗則interrupt當前線程
 if (!tryAcquire(arg) &&
 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
 selfInterrupt();
 }

寫鎖的獲取也主要是 tryAcquire(arg) 方法,這裏也拆解步驟:

  • 操做1:若是讀鎖數量不爲0或者寫鎖數量不爲0,而且不是重入操做,則獲取失敗。
  • 操做2:若是當前鎖的數量爲0,也就是不存在操做1的狀況,那麼該線程是有資格獲取到寫鎖,所以修改狀態,設置獨佔線程爲當前線程

清單9:寫鎖的獲取

protected final boolean tryAcquire(int acquires) {
 Thread current = Thread.currentThread();
 int c = getState();
 int w = exclusiveCount(c);
 // 操做1:c != 0,說明存在讀鎖或者寫鎖
 if (c != 0) {
 // (Note: if c != 0 and w == 0 then shared count != 0) 
 // 寫鎖爲0,讀鎖不爲0 或者獲取寫鎖的線程並非當前線程,直接失敗
 if (w == 0 || current != getExclusiveOwnerThread())
 return false;
 if (w + exclusiveCount(acquires) > MAX_COUNT)
 throw new Error("Maximum lock count exceeded");
 // Reentrant acquire
 // 執行到這裏說明是寫鎖線程的重入操做,直接修改狀態,也不須要CAS由於沒有競爭
 setState(c + acquires);
 return true;
 }
 // 操做2:獲取寫鎖,writerShouldBlock對於非公平模式直接返回fasle,對於公平模式則線程須要排隊,所以須要阻塞。
 if (writerShouldBlock() ||
 !compareAndSetState(c, c + acquires))
 return false;
 setExclusiveOwnerThread(current);
 return true;
}

寫鎖的釋放

清單10:寫鎖的釋放入口

// WriteLock
public void unlock() {
 sync.release(1);
 }
// AQS
public final boolean release(int arg) {
 // 釋放鎖成功後喚醒隊列中第一個線程
 if (tryRelease(arg)) {
 Node h = head;
 if (h != null && h.waitStatus != 0)
 unparkSuccessor(h);
 return true;
 }
 return false;
}

寫鎖的釋放主要是 tryRelease(arg) 方法,其邏輯就比較簡單了,註釋很詳細。

清單11:寫鎖的釋放

protected final boolean tryRelease(int releases) {
 // 若是當前線程沒有獲取寫鎖卻釋放,則直接拋異常
 if (!isHeldExclusively())
 throw new IllegalMonitorStateException();
 // 狀態變動至nextc
 int nextc = getState() - releases;
 // 由於寫鎖是能夠重入,因此在都釋放完畢後要把獨佔標識清空
 boolean free = exclusiveCount(nextc) == 0;
 if (free)
 setExclusiveOwnerThread(null);
 // 修改狀態
 setState(nextc);
 return free;
 }

一些其餘問題

鎖降級操做哪裏體現?

鎖降級操做指的是一個線程獲取寫鎖以後再獲取讀鎖,而後讀鎖釋放掉寫鎖的過程。在 tryAcquireShared(arg) 獲取讀鎖的代碼中有以下代碼。

清單12:寫鎖降級策略

Thread current = Thread.currentThread();
 // 當前狀態
 int c = getState();
 // 存在寫鎖,而且寫鎖不等於當前線程時返回,換句話說等寫鎖爲當前線程時則能夠繼續往下獲取讀鎖。
 if (exclusiveCount(c) != 0 &&
 getExclusiveOwnerThread() != current)
 return -1;
。。。。。讀鎖獲取。。。。。

那麼鎖降級有什麼用?答案是爲了可見性的保證。在 ReentrantReadWriteLock 的javadoc中有以下代碼,其是鎖降級的一個應用示例。

class CachedData {
 Object data;
 volatile boolean cacheValid;
 final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
 
 void processCachedData() {
 // 獲取讀鎖
 rwl.readLock().lock();
 if (!cacheValid) {
 // Must release read lock before acquiring write lock,不釋放的話下面寫鎖會獲取不成功,形成死鎖
 rwl.readLock().unlock();
 // 獲取寫鎖
 rwl.writeLock().lock();
 try {
 // Recheck state because another thread might have
 // acquired write lock and changed state before we did.
 if (!cacheValid) {
 data = ...
 cacheValid = true;
 }
 // Downgrade by acquiring read lock before releasing write lock
 // 這裏再次獲取讀鎖,若是不獲取那麼當寫鎖釋放後可能其餘寫線程再次得到寫鎖,致使下方`use(data)`時出現不一致的現象
 // 這個操做就是降級
 rwl.readLock().lock();
 } finally {
 rwl.writeLock().unlock(); // Unlock write, still hold read
 }
 }
 try {
 // 使用完後釋放讀鎖
 use(data);
 } finally {
 rwl.readLock().unlock();
 }
 }
 }}

公平與非公平的區別

清單13:公平下的Sync

static final class FairSync extends Sync {
 private static final long serialVersionUID = -2274990926593161451L;
 final boolean writerShouldBlock() {
 return hasQueuedPredecessors(); // 隊列中是否有元素,有責當前操做須要block
 }
 final boolean readerShouldBlock() {
 return hasQueuedPredecessors();// 隊列中是否有元素,有責當前操做須要block
 }
 }

公平下的Sync實現策略是全部獲取的讀鎖或者寫鎖的線程都須要入隊排隊,按照順序依次去嘗試獲取鎖。

清單14:非公平下的Sync

static final class NonfairSync extends Sync {
 private static final long serialVersionUID = -8159625535654395037L;
 final boolean writerShouldBlock() {
 // 非公平下不考慮排隊,所以寫鎖能夠競爭獲取
 return false; // writers can always barge
 }
 final boolean readerShouldBlock() {
 /* As a heuristic to avoid indefinite writer starvation,
 * block if the thread that momentarily appears to be head
 * of queue, if one exists, is a waiting writer. This is
 * only a probabilistic effect since a new reader will not
 * block if there is a waiting writer behind other enabled
 * readers that have not yet drained from the queue.
 */
 // 這裏其實是一個優先級,若是隊列中頭部元素時寫鎖,那麼讀鎖須要等待,避免寫鎖飢餓。
 return apparentlyFirstQueuedIsExclusive();
 }
 }

非公平下因爲搶佔式獲取鎖,寫鎖是可能產生飢餓,所以解決辦法就是提升寫鎖的優先級,換句話說獲取寫鎖以前先佔坑。

相關文章
相關標籤/搜索