在前幾篇文章中瞭解了ReentrantLock、Semaphore與CountDownLatch後,J.U.C包中基於AQS實現的併發工具類還剩一個比較重要的:讀寫鎖ReentrantReadWriteLock。讀寫鎖在Java面試過程當中是一個常常性考的題目,他涉及到的知識點比較多,致使不少人不能透徹的理解他。舉幾個面試中常見的問題:java
上面的問題你們都能回答出來嗎?若是都能很好的回答,那麼這篇文章也許對你沒有太大幫助。若是不能,彆着急,下面就來一一分析解答上面的幾個問題。面試
這個問題你們應該都會回答:ReentrantLock是獨佔鎖,ReentrantReadWriteLock是讀寫鎖。那麼這就引伸出下一個問題:ReentrantReadWriteLock中的讀鎖和寫鎖分別是獨佔仍是共享的?他們之間的關係又是什麼樣的?要了解透徹這兩個問題,分析源碼是再好不過的了。緩存
使用ReentrantReadWriteLock讀寫鎖的方式,會調用readLock()和writeLock()兩個方法,看下他們的源碼:多線程
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; } public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
能夠看到用到了WriteLock和ReadLock兩個靜態內部類,他們對鎖的實現以下:併發
public static class ReadLock implements Lock, java.io.Serializable { public void lock() { sync.acquireShared(1); //共享 } public void unlock() { sync.releaseShared(1); //共享 } } public static class WriteLock implements Lock, java.io.Serializable { public void lock() { sync.acquire(1); //獨佔 } public void unlock() { sync.release(1); //獨佔 } } abstract static class Sync extends AbstractQueuedSynchronizer {}
看到這裏發現了ReentrantReadWriteLock和ReentrantLock的一個相同點和不一樣點,相同的是使用了同一個關鍵實現AbstractQueuedSynchronizer,不一樣的是ReentrantReadWriteLock使用了兩個鎖分別實現了AQS,並且WriteLock和ReentrantLock同樣,使用了獨佔鎖。而ReadLock和Semaphore同樣,使用了共享鎖。再往下的內容估計看過前面幾篇文章的都很熟悉了,獨佔鎖經過state變量的0和1兩個狀態來控制是否有線程佔有鎖,共享鎖經過state變量0或者非0來控制多個線程訪問。在上面的代碼中,ReadLock和WriteLock使用了同一個AQS,那麼在ReentrantReadWriteLock中又是怎麼控制讀鎖和寫鎖關係的呢?ide
讀寫鎖定義爲:一個資源可以被多個讀線程訪問,或者被一個寫線程訪問,可是不能同時存在讀寫線程。工具
經過這句話很容易聯想到經過兩個不一樣的變量來控制讀寫,當獲取到讀鎖時對讀變量+1,當獲取懂啊寫變量時對寫變量+1。但AQS中並無爲ReadLock和WriteLock添加額外的變量,仍是經過一個state來實現的。那是怎麼作到讀寫分離的呢?來看看下面這段代碼:1。但AQS中並無爲ReadLock和WriteLock添加額外的變量,仍是經過一個state來實現的。那是怎麼作到讀寫分離的呢?來看看下面這段代碼:測試
static final int SHARED_SHIFT = 16; static final int SHARED_UNIT = (1 << SHARED_SHIFT); static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; /** Returns the number of shared holds represented in count */ static int sharedCount(int c) { return c >>> SHARED_SHIFT; } /** Returns the number of exclusive holds represented in count */ static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
這段代碼在Sync靜態內部類中,這裏有兩個關鍵方法sharedCount和exclusiveCount,經過名字能夠看出sharedCount是共享鎖的數量,exclusiveCount是獨佔鎖的數量。共享鎖經過對c像右位移16位得到,獨佔鎖經過和16位的1與運算得到。舉個例子,當獲取讀鎖的線程有3個,寫鎖的線程有1個(固然這是不可能同時有的),state就表示爲0000 0000 0000 0011 0000 0000 0000 0001,高16位表明讀鎖,經過向右位移16位(c >>> SHARED_SHIFT)得倒10進制的3,經過和0000 0000 0000 0000 1111 1111 1111 1111與運算(c & EXCLUSIVE_MASK),得到10進制的1。弄懂了着幾個方法,就明白了爲何經過一個state實現了讀寫共享。ui
這當中還有一個問題,因爲16位最大全1表示爲65535,因此讀鎖和寫鎖最多能夠獲取65535個。線程
上面說過,WriteLock也是獨佔鎖,那麼他和ReentrantLock有什麼區別呢?最大的區別就在獲取鎖時WriteLock不只須要考慮是否有其餘寫鎖佔用,同時還要考慮是否有其餘讀鎖,而ReentrantLock只須要考慮自身是否被佔用就好了。來看下WriteLock獲取鎖的源代碼:
public void lock() { sync.acquire(1); } public final void acquire(int arg) { if (!tryAcquire(arg) && //嘗試獲取獨佔鎖 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //獲取失敗後排隊 selfInterrupt(); } protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); //獲取共享變量state int w = exclusiveCount(c); //獲取寫鎖數量 if (c != 0) { //有讀鎖或者寫鎖 // (Note: if c != 0 and w == 0 then shared count != 0) if (w == 0 || current != getExclusiveOwnerThread()) //寫鎖爲0(證實有讀鎖),或者持有寫鎖的線程不爲當前線程 return false; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // Reentrant acquire setState(c + acquires); //當前線程持有寫鎖,爲重入鎖,+acquires便可 return true; } if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) //CAS操做失敗,多線程狀況下被搶佔,獲取鎖失敗。CAS成功則獲取鎖成功 return false; setExclusiveOwnerThread(current); return true; }
這段代碼是否是很熟悉?和ReentrantLock中獲取鎖的代碼很類似,差異在於其中調用了exclusiveCount方法來獲取是否存在寫鎖,而後經過c != 0和w == 0判斷了是否存在讀鎖。acquireQueued和addWaiter就不詳細解說了,須要瞭解的能夠查看前面ReentrantLock的文章。
到這裏你們應該對ReentrantReadWriteLock和ReentrantLock的區別應該作到內心有數了吧。
WriteLock是獨佔模式,咱們比較了它和ReentrantLock獨佔鎖獲取鎖的區別,這裏咱們再看看ReadLock在獲取鎖上有什麼不一樣呢?先看下面的源代碼:
protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) //寫鎖不等於0的狀況下,驗證是不是當前寫鎖嘗試獲取讀鎖 return -1; int r = sharedCount(c); //獲取讀鎖數量 if (!readerShouldBlock() && //讀鎖不須要阻塞 r < MAX_COUNT && //讀鎖小於最大讀鎖數量 compareAndSetState(c, c + SHARED_UNIT)) { //CAS操做嘗試設置獲取讀鎖 也就是高位加1 if (r == 0) { //當前線程第一個而且第一次獲取讀鎖, firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { //當前線程是第一次獲取讀鎖的線程 firstReaderHoldCount++; } else { // 當前線程不是第一個獲取讀鎖的線程,放入線程本地變量 HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current); }ß
在上面的代碼中嘗試獲取讀鎖的過程和獲取寫鎖的過程也很類似,不一樣在於讀鎖只要沒有寫鎖佔用而且不超過最大獲取數量均可以嘗試獲取讀鎖,而寫鎖不只須要考慮讀鎖是否佔用,也要考慮寫鎖是否佔用。上面的代碼中firstReader,firstReaderHoldCount以及cachedHoldCounter都是爲readHolds(ThreadLocalHoldCounter)服務的,用來記錄每一個讀鎖獲取線程的獲取次數,方便獲取當前線程持有鎖的次數信息。在ThreadLocal基礎上添加了一個Int變量來統計次數,能夠經過他們的實現來理解:
static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { //ThreadLocal變量ß public HoldCounter initialValue() { return new HoldCounter(); } } static final class HoldCounter { int count = 0; //當前線程持有鎖的次數 // Use id, not reference, to avoid garbage retention final long tid = getThreadId(Thread.currentThread()); //當前線程ID }
先來分析一個簡單緩存須要知足的功能,這裏咱們爲了實現簡單,不考慮緩存過時策略等複雜因素。
public static void ReentrantReadWriteLockCacheSystem() { //這裏爲了實現簡單,將緩存大小設置爲4。 Map<String, String> cacheMap = new HashMap<>(4); ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); for (int i = 0; i < 20; i++) { //同時開啓20個線程訪問緩存 final String key = String.valueOf(i % 4); Thread thread = new Thread(new Runnable() { @Override public void run() { try { //①讀取緩存時獲取讀鎖 readWriteLock.readLock().lock(); //獲取讀鎖後經過key獲取緩存中的值 String valueStr = cacheMap.get(key); //緩存值不存在 if (valueStr == null) { //③釋放讀鎖後再嘗試獲取寫鎖 readWriteLock.readLock().unlock(); try { //④獲取寫鎖來寫入不存在的key值, readWriteLock.writeLock().lock(); valueStr = cacheMap.get(key); if (valueStr == null) { valueStr = key + " --- value"; cacheMap.put(key, valueStr); //寫入值 System.out.println(Thread.currentThread().getName() + " --------- put " + valueStr); } // ⑥鎖降級,避免被其餘寫線程搶佔後再次更新值,保證這一次操做的原子性 readWriteLock.readLock().lock(); System.out.println(Thread.currentThread().getName() + " --------- get new " + valueStr); } finally { readWriteLock.writeLock().unlock(); //⑤釋放寫鎖 } } else { System.out.println(Thread.currentThread().getName() + " ------ get cache value"); } } finally { readWriteLock.readLock().unlock(); //②釋放讀鎖 } } }, String.valueOf(i)); thread.start(); } }
首先線程會嘗試去獲取數據,須要獲取讀鎖①,若是存在值,則直接讀取並釋放讀鎖②。若是不存在值,則首先釋放已經獲取的讀鎖③,而後嘗試獲取寫鎖④。獲取到寫鎖以後,再次檢查值,由於此時可能存在其餘寫鎖已經更新值,這時只須要讀取,而後釋放寫鎖⑤。若是仍是沒有值,則經過其餘途徑獲取值並更新而後獲取讀鎖⑥,這一步鎖降級操做是爲了直接搶佔讀鎖,避免釋放寫鎖以後再次獲取讀鎖時被其餘寫線程搶佔,這樣保證了這一次讀取數據的原子性。以後再執行⑤釋放寫鎖和②釋放讀鎖。
執行後輸出結果以下,每次執行可能輸出不一樣:
//1 --------- put 1 --- value //1 --------- get new 1 --- value //0 --------- put 0 --- value //0 --------- get new 0 --- value //9 ------ get cache value //4 ------ get cache value //2 --------- put 2 --- value //2 --------- get new 2 --- value //11 --------- put 3 --- value //11 --------- get new 3 --- value //5 ------ get cache value //13 ------ get cache value //6 ------ get cache value //8 ------ get cache value //7 ------ get cache value //3 --------- get new 3 --- value //10 ------ get cache value //12 ------ get cache value //14 ------ get cache value //15 ------ get cache value //16 ------ get cache value //17 ------ get cache value //18 ------ get cache value //19 ------ get cache value
通過上面對讀寫鎖原理的初步分析和使用,如今你能本身實現一個簡單的讀寫鎖了嗎?這裏列出了一步步實現一個簡單的讀寫鎖的過程,你能夠按這個步驟本身先實現一遍。
public class MyReadWriteLock { private int state = 0; //1. 定義一個讀寫鎖共享變量state //2. state高16位爲讀鎖數量 private int GetReadCount() { return state >>> 16; } //2. 低16位爲寫鎖數量 private int GetWriteCount() { return state & ((1 << 16) - 1); } //3. 獲取讀鎖時先判斷是否有寫鎖,有則等待,沒有則將讀鎖數量加1 public synchronized void lockRead() throws InterruptedException{ while (GetWriteCount() > 0) { wait(); } System.out.println("lockRead ---" + Thread.currentThread().getName()); state = state + (1 << 16); } //4. 釋放讀鎖數量減1,通知全部等待線程 public synchronized void unlockRead() { state = state - ((1 << 16)); notifyAll(); } //5. 獲取寫鎖時須要判斷讀鎖和寫鎖是否都存在,有則等待,沒有則將寫鎖數量加1 public synchronized void lockWriters() throws InterruptedException{ while (GetReadCount() > 0 || GetWriteCount() > 0) { wait(); } System.out.println("lockWriters ---" + Thread.currentThread().getName()); state++; } //6. 釋放寫鎖數量減1,通知全部等待線程 public synchronized void unlockWriters(){ state--; notifyAll(); } }
在讀寫的過程當中,寫操做通常是優先的,不能由於讀操做過多,寫操做一直等待的情況發生,這樣會致使數據一直得不到更新,發生寫飢餓。如今你們思考一下上面咱們實現的簡單讀寫鎖,是否能作到這點呢?答案很明顯,在讀寫線程都wait的狀況下,經過notifyAll並不能保證寫優先執行。那在這個例子中怎麼改進這一點呢?
這裏我經過添加一箇中間變量來達到這個目的,這個中間變量在獲取寫鎖以前先記錄一個寫請求,這樣一旦notifyAll,優先檢查是否有寫請求,若是有,則讓寫操做優先執行,具體代碼以下:
public class MyReadWriteLock { private int state = 0; //1. 定義一個讀寫鎖共享變量state private int writeRequest = 0; //記錄寫請求數量 //2. state高16位爲讀鎖數量 private int GetReadCount() { return state >>> 16; } //2. 低16位爲寫鎖數量 private int GetWriteCount() { return state & ((1 << 16) - 1); } //3. 獲取讀鎖時先判斷是否有寫鎖,有則等待,沒有則將讀鎖數量加1 public synchronized void lockRead() throws InterruptedException{ //寫鎖數量大於0或者寫請求數量大於0的狀況下都優先執行寫 while (GetWriteCount() > 0 || writeRequest > 0) { wait(); } System.out.println("lockRead ---" + Thread.currentThread().getName()); state = state + (1 << 16); } //4. 釋放讀鎖數量減1,通知全部等待線程 public synchronized void unlockRead() { state = state - ((1 << 16)); notifyAll(); } //5. 獲取寫鎖時須要判斷讀鎖和寫鎖是否都存在,有則等待,沒有則將寫鎖數量加1 public synchronized void lockWriters() throws InterruptedException{ writeRequest++; //寫請求+1 while (GetReadCount() > 0 || GetWriteCount() > 0) { wait(); } writeRequest--; //獲取寫鎖後寫請求-1 System.out.println("lockWriters ---" + Thread.currentThread().getName()); state++; } //6. 釋放寫鎖數量減1,通知全部等待線程 public synchronized void unlockWriters(){ state--; notifyAll(); } }
你們能夠測試下上面的代碼看看是否是寫請求都優先執行了呢?如今咱們把這個問題放到ReentrantReadWriteLock中來考慮,顯而易見,ReentrantReadWriteLock也會發生寫請求飢餓的狀況,由於寫請求同樣會排隊,不論是公平鎖仍是非公平鎖,在有讀鎖的狀況下,都不能保證寫鎖必定能獲取到,這樣只要讀鎖一直佔用,就會發生寫飢餓的狀況。那麼JDK就沒有提供什麼好辦法來解決這個問題嗎?固然是有的,那就是JDK8中新增的改進讀寫鎖---StampedLock,在下一篇文章中將會對StampedLock進行詳細講解。