讀寫鎖ReentrantReadWriteLock實現接口ReadWriteLock,該接口維護了一對相關的鎖,一個用於只讀操做,另外一個用於寫入操做。只要沒有 writer,讀取鎖能夠由多個 reader 線程同時保持。寫入鎖是獨佔的。數組
public interface ReadWriteLock { Lock readLock(); Lock writeLock(); }
/** 內部類 讀鎖 */ private final ReentrantReadWriteLock.ReadLock readerLock; /** 內部類 寫鎖 */ private final ReentrantReadWriteLock.WriteLock writerLock; final Sync sync; /** 使用默認(非公平)的排序屬性建立一個新的 ReentrantReadWriteLock */ public ReentrantReadWriteLock() { this(false); } /** 使用給定的公平策略建立一個新的 ReentrantReadWriteLock */ public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); } /** 返回用於寫入操做的鎖 */ public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; } /** 返回用於讀取操做的鎖 */ public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; } abstract static class Sync extends AbstractQueuedSynchronizer { /** * 省略其他源代碼 */ } public static class WriteLock implements Lock, java.io.Serializable{ /** * 省略其他源代碼 */ } public static class ReadLock implements Lock, java.io.Serializable { /** * 省略其他源代碼 */ }
在ReentrantLock中使用一個int類型的state來表示同步狀態,該值表示鎖被一個線程重複獲取的次數。可是讀寫鎖ReentrantReadWriteLock內部維護着兩個一對鎖,須要用一個變量維護多種狀態。因此讀寫鎖採用「按位切割使用」的方式來維護這個變量,將其切分爲兩部分,高16爲表示讀,低16爲表示寫。分割以後,讀寫鎖是如何迅速肯定讀鎖和寫鎖的狀態呢?經過爲運算。假如當前同步狀態爲S,那麼寫狀態等於 S & 0x0000FFFF(將高16位所有抹去),讀狀態等於S >>> 16(無符號補0右移16位)。代碼以下:併發
static final int SHARED_SHIFT = 16; static final int SHARED_UNIT = (1 << SHARED_SHIFT); static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; static int sharedCount(int c) { return c >>> SHARED_SHIFT; } static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); //當前鎖個數 int c = getState(); //寫鎖 int w = exclusiveCount(c); if (c != 0) { //c != 0 && w == 0 表示存在讀鎖 //當前線程不是已經獲取寫鎖的線程 if (w == 0 || current != getExclusiveOwnerThread()) return false; //超出最大範圍 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); setState(c + acquires); return true; } //是否須要阻塞 if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; //設置獲取鎖的線程爲當前線程 setExclusiveOwnerThread(current); return true; }
該方法和ReentrantLock的tryAcquire(int arg)大體同樣,在判斷重入時增長了一項條件:讀鎖是否存在。由於要確保寫鎖的操做對讀鎖是可見的,若是在存在讀鎖的狀況下容許獲取寫鎖,那麼那些已經獲取讀鎖的其餘線程可能就沒法感知當前寫線程的操做。所以只有等讀鎖徹底釋放後,寫鎖纔可以被當前線程所獲取,一旦寫鎖獲取了,全部其餘讀、寫線程均會被阻塞。dom
public void unlock() { sync.release(1); } public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
寫鎖的釋放最終仍是會調用AQS的模板方法release(int arg)方法,該方法首先調用tryRelease(int arg)方法嘗試釋放鎖,tryRelease(int arg)方法爲讀寫鎖內部類Sync中定義了,以下:性能
protected final boolean tryRelease(int releases) { //釋放的線程不爲鎖的持有者 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; //若寫鎖的新線程數爲0,則將鎖的持有者設置爲null boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc); return free; }
寫鎖釋放鎖的整個過程和獨佔鎖ReentrantLock類似,每次釋放均是減小寫狀態,當寫狀態爲0時表示 寫鎖已經徹底釋放了,從而等待的其餘線程能夠繼續訪問讀寫鎖,獲取同步狀態,同時這次寫線程的修改對後續的線程可見。ui
public void lock() { sync.acquireShared(1); }
Sync的acquireShared(int arg)定義在AQS中:
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
tryAcqurireShared(int arg)嘗試獲取讀同步狀態,該方法主要用於獲取共享式同步狀態,獲取成功返回 >= 0的返回結果,不然返回 < 0 的返回結果。
protected final int tryAcquireShared(int unused) { //當前線程 Thread current = Thread.currentThread(); int c = getState(); //exclusiveCount(c)計算寫鎖 //若是存在寫鎖,且鎖的持有者不是當前線程,直接返回-1 //存在鎖降級問題,後續闡述 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; //讀鎖 int r = sharedCount(c); /* * readerShouldBlock():讀鎖是否須要等待(公平鎖原則) * r < MAX_COUNT:持有線程小於最大數(65535) * compareAndSetState(c, c + SHARED_UNIT):設置讀取鎖狀態 */ if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { /* * holdCount部分後面講解 */ if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current); }
final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; for (;;) { int c = getState(); //鎖降級 if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) return -1; } //讀鎖須要阻塞 else if (readerShouldBlock()) { //列頭爲當前線程 if (firstReader == current) { } //HoldCounter後面講解 else { if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } if (rh.count == 0) return -1; } } //讀鎖超出最大範圍 if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); //CAS設置讀鎖成功 if (compareAndSetState(c, c + SHARED_UNIT)) { //若是是第1次獲取「讀取鎖」,則更新firstReader和firstReaderHoldCount if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } //若是想要獲取鎖的線程(current)是第1個獲取鎖(firstReader)的線程,則將firstReaderHoldCount+1 else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); //更新線程的獲取「讀取鎖」的共享計數 rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } }
fullTryAcquireShared(Thread current)會根據「是否須要阻塞等待」,「讀取鎖的共享計數是否超過限制」等等進行處理。若是不須要阻塞等待,而且鎖的共享計數沒有超過限制,則經過CAS嘗試獲取鎖,並返回1
public void unlock() { sync.releaseShared(1); }
unlcok()方法內部使用Sync的releaseShared(int arg)方法,該方法定義在AQS中:
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
調用tryReleaseShared(int arg)嘗試釋放讀鎖,該方法定義在讀寫鎖的Sync內部類中:
protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); //若是想要釋放鎖的線程爲第一個獲取鎖的線程 if (firstReader == current) { //僅獲取了一次,則須要將firstReader 設置null,不然 firstReaderHoldCount - 1 if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } //獲取rh對象,並更新「當前線程獲取鎖的信息」 else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } //CAS更新同步狀態 for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) return nextc == 0; } }
ReadWriteLock rtLock = new ReentrantReadWriteLock(); rtLock.readLock().lock(); System.out.println("get readLock."); rtLock.writeLock().lock(); System.out.println("blocking");
ReadWriteLock rtLock = new ReentrantReadWriteLock(); rtLock.writeLock().lock(); System.out.println("writeLock"); rtLock.readLock().lock(); System.out.println("get read lock");
public class ReadWriteLockTest { public static void main(String[] args) { final Queue3 q3 = new Queue3(); for(int i=0;i<3;i++) { new Thread(){ public void run(){ while(true){ q3.get(); } } }.start(); } for(int i=0;i<3;i++) { new Thread(){ public void run(){ while(true){ q3.put(new Random().nextInt(10000)); } } }.start(); } } } class Queue3{ private Object data = null;//共享數據,只能有一個線程能寫該數據,但能夠有多個線程同時讀該數據。 private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); public void get(){ System.out.println(Thread.currentThread().getName() + " be to read data!"); try { Thread.sleep((long)(Math.random()*1000)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "have read data :" + data); } public void put(Object data){ rwl.writeLock().lock();//上寫鎖,不容許其餘線程讀也不容許寫 try { System.out.println(Thread.currentThread().getName() + " be to write data!"); try { Thread.sleep((long)(Math.random()*1000)); } catch (InterruptedException e) { e.printStackTrace(); } this.data = data; System.out.println(Thread.currentThread().getName() + " have write data: " + data); } finally { rwl.writeLock().unlock();//釋放寫鎖 } } }
Thread-0 be ready to read data! Thread-1 be ready to read data! Thread-2 be ready to read data! Thread-0have read data :null Thread-2have read data :null Thread-1have read data :null Thread-5 be ready to write data! Thread-5 have write data: 6934 Thread-5 be ready to write data! Thread-5 have write data: 8987 Thread-5 be ready to write data! Thread-5 have write data: 8496
import java.util.HashMap; import java.util.Map; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; public class CacheDemo { /** * 緩存器,這裏假設須要存儲1000左右個緩存對象,按照默認的負載因子0.75,則容量=750,大概估計每個節點鏈表長度爲5個 * 那麼數組長度大概爲:150,又有雨設置map大小通常爲2的指數,則最近的數字爲:128 */ private Map<String, Object> map = new HashMap<>(128); private ReadWriteLock rwl = new ReentrantReadWriteLock(); public static void main(String[] args) { } public Object get(String id){ Object value = null; rwl.readLock().lock();//首先開啓讀鎖,從緩存中去取 try{ value = map.get(id); if(value == null){ //若是緩存中沒有釋放讀鎖,上寫鎖 rwl.readLock().unlock(); rwl.writeLock().lock(); try{ if(value == null){ //防止多寫線程重複查詢賦值 value = "redis-value"; //此時能夠去數據庫中查找,這裏簡單的模擬一下 } rwl.readLock().lock(); //加讀鎖降級寫鎖,不明白的能夠查看上面鎖降級的原理與保持讀取數據原子性的講解 }finally{ rwl.writeLock().unlock(); //釋放寫鎖 } } }finally{ rwl.readLock().unlock(); //最後釋放讀鎖 } return value; } }