說到ReentrantReadWriteLock,首先要作的是與ReentrantLock劃清界限。它和後者都是單獨的實現,彼此之間沒有繼承或實現的關係。 html
ReentrantLock 實現了標準的互斥操做,也就是一次只能有一個線程持有鎖,也即所謂獨佔鎖的概念。前面的章節中一直在強調這個特色。顯然這個特色在必定程度上面減低了吞吐量,實際上獨佔鎖是一種保守的鎖策略,在這種狀況下任何「讀/讀」,「寫/讀」,「寫/寫」操做都不能同時發生。可是一樣須要強調的一個概念是,鎖是有必定的開銷的,當併發比較大的時候,鎖的開銷就比較客觀了。因此若是可能的話就儘可能少用鎖,非要用鎖的話就嘗試看可否改造爲讀寫鎖。 java
ReadWriteLock 描述的是:一個資源可以被多個讀線程訪問,或者被一個寫線程訪問,可是不能同時存在讀寫線程。也就是說讀寫鎖使用的場合是一個共享資源被大量讀取操做,而只有少許的寫操做(修改數據)。清單0描述了ReadWriteLock的API。 緩存
清單0 ReadWriteLock 接口 併發
- public interface ReadWriteLock {
- Lock readLock();
- Lock writeLock();
- }
清單0描述的ReadWriteLock結構,這裏須要說明的是ReadWriteLock並非Lock的子接口,只不過ReadWriteLock藉助Lock來實現讀寫兩個視角。在ReadWriteLock中每次讀取共享數據就須要讀取鎖,當須要修改共享數據時就須要寫入鎖。看起來好像是兩個鎖,但其實不盡然,下文會指出。 app
ReentrantReadWriteLock有如下幾個特性: 性能
ReentrantReadWriteLock裏面的鎖主體就是一個Sync,也就是上面提到的FairSync或者NonfairSync,因此說實際上只有一個鎖,只是在獲取讀取鎖和寫入鎖的方式上不同,因此前面纔有讀寫鎖是獨佔鎖的兩個不一樣視圖一說。 ui
ReentrantReadWriteLock裏面有兩個類:ReadLock/WriteLock,這兩個類都是Lock的實現。 spa
清單1 ReadLock 片斷 .net
- public static class ReadLock implements Lock, java.io.Serializable {
- private final Sync sync;
- protected ReadLock(ReentrantReadWriteLock lock) {
- sync = lock.sync;
- }
- public void lock() {
- sync.acquireShared(1);
- }
- public void lockInterruptibly() throws InterruptedException {
- sync.acquireSharedInterruptibly(1);
- }
- public boolean tryLock() {
- return sync.tryReadLock();
- }
- public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
- return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
- }
- public void unlock() {
- sync.releaseShared(1);
- }
- public Condition newCondition() {
- throw new UnsupportedOperationException();
- }
- }
清單2 WriteLock 片斷 線程
- public static class WriteLock implements Lock, java.io.Serializable {
- private final Sync sync;
- protected WriteLock(ReentrantReadWriteLock lock) {
- sync = lock.sync;
- }
- public void lock() {
- sync.acquire(1);
- }
- public void lockInterruptibly() throws InterruptedException {
- sync.acquireInterruptibly(1);
- }
- public boolean tryLock( ) {
- return sync.tryWriteLock();
- }
- public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
- return sync.tryAcquireNanos(1, unit.toNanos(timeout));
- }
- public void unlock() {
- sync.release(1);
- }
- public Condition newCondition() {
- return sync.newCondition();
- }
- public boolean isHeldByCurrentThread() {
- return sync.isHeldExclusively();
- }
- public int getHoldCount() {
- return sync.getWriteHoldCount();
- }
- }
清單1描述的是讀鎖的實現,清單2描述的是寫鎖的實現。顯然WriteLock就是一個獨佔鎖,這和ReentrantLock裏面的實現幾乎相同,都是使用了AQS的acquire/release操做。固然了在內部處理方式上與ReentrantLock仍是有一點不一樣的。對比清單1和清單2能夠看到,ReadLock獲取的是共享鎖,WriteLock獲取的是獨佔鎖。
在AQS章節中介紹到AQS中有一個state字段(int類型,32位)用來描述有多少線程獲持有鎖。在獨佔鎖的時代這個值一般是0或者1(若是是重入的就是重入的次數),在共享鎖的時代就是持有鎖的數量。在上一節中談到,ReadWriteLock的讀、寫鎖是相關可是又不一致的,因此須要兩個數來描述讀鎖(共享鎖)和寫鎖(獨佔鎖)的數量。顯然如今一個state就不夠用了。因而在ReentrantReadWrilteLock裏面將這個字段一分爲二,高位16位表示共享鎖的數量,低位16位表示獨佔鎖的數量(或者重入數量)。2^16-1=65536,這就是上節中提到的爲何共享鎖和獨佔鎖的數量最大隻能是65535的緣由了。
有了上面的知識後再來分析讀寫鎖的獲取和釋放就容易多了。
清單3 寫入鎖獲取片斷
- protected final boolean tryAcquire(int acquires) {
- Thread current = Thread.currentThread();
- int c = getState();
- int w = exclusiveCount(c);
- if (c != 0) {
- if (w == 0 || current != getExclusiveOwnerThread())
- return false;
- if (w + exclusiveCount(acquires) > MAX_COUNT)
- throw new Error("Maximum lock count exceeded");
- }
- if ((w == 0 && writerShouldBlock(current)) ||
- !compareAndSetState(c, c + acquires))
- return false;
- setExclusiveOwnerThread(current);
- return true;
- }
清單3 是寫入鎖獲取的邏輯片斷,整個工做流程是這樣的:
清單3 中 exclusiveCount(c)就是獲取寫線程數(包括重入數),也就是state的低16位值。另外這裏有一段邏輯是當前寫線程是否須要阻塞writerShouldBlock(current)。清單4 和清單5 就是公平鎖和非公平鎖中是否須要阻塞的片斷。很顯然對於非公平鎖而言老是不阻塞當前線程,而對於公平鎖而言若是AQS隊列不爲空或者當前線程不是在AQS的隊列頭那麼就阻塞線程,直到隊列前面的線程處理完鎖邏輯。
清單4 公平讀寫鎖寫線程是否阻塞
- final boolean writerShouldBlock(Thread current) {
- return !isFirst(current);
- }
清單5 非公平讀寫鎖寫線程是否阻塞
- final boolean writerShouldBlock(Thread current) {
- return false;
- }
寫入鎖的獲取邏輯清楚後,釋放鎖就比較簡單了。清單6 描述的寫入鎖釋放邏輯片斷,其實就是檢測下剩下的寫入鎖數量,若是是0就將獨佔鎖線程清空(意味着沒有線程獲取鎖),不然就是說當前是重入鎖的一次釋放,因此不能將獨佔鎖線程清空。而後將剩餘線程狀態數寫回AQS。
清單6 寫入鎖釋放邏輯片斷
- protected final boolean tryRelease(int releases) {
- int nextc = getState() - releases;
- if (Thread.currentThread() != getExclusiveOwnerThread())
- throw new IllegalMonitorStateException();
- if (exclusiveCount(nextc) == 0) {
- setExclusiveOwnerThread(null);
- setState(nextc);
- return true;
- } else {
- setState(nextc);
- return false;
- }
- }
清單3~6 描述的寫入鎖的獲取釋放過程。讀取鎖的獲取和釋放過程要稍微複雜些。 清單7描述的是讀取鎖的獲取過程。
清單7 讀取鎖獲取過程片斷
- protected final int tryAcquireShared(int unused) {
- Thread current = Thread.currentThread();
- int c = getState();
- if (exclusiveCount(c) != 0 &&
- getExclusiveOwnerThread() != current)
- return -1;
- if (sharedCount(c) == MAX_COUNT)
- throw new Error("Maximum lock count exceeded");
- if (!readerShouldBlock(current) &&
- compareAndSetState(c, c + SHARED_UNIT)) {
- HoldCounter rh = cachedHoldCounter;
- if (rh == null || rh.tid != current.getId())
- cachedHoldCounter = rh = readHolds.get();
- rh.count++;
- return 1;
- }
- return fullTryAcquireShared(current);
- }
- final int fullTryAcquireShared(Thread current) {
- HoldCounter rh = cachedHoldCounter;
- if (rh == null || rh.tid != current.getId())
- rh = readHolds.get();
- for (;;) {
- int c = getState();
- int w = exclusiveCount(c);
- if ((w != 0 && getExclusiveOwnerThread() != current) ||
- ((rh.count | w) == 0 && readerShouldBlock(current)))
- return -1;
- if (sharedCount(c) == MAX_COUNT)
- throw new Error("Maximum lock count exceeded");
- if (compareAndSetState(c, c + SHARED_UNIT)) {
- cachedHoldCounter = rh; // cache for release
- rh.count++;
- return 1;
- }
- }
- }
讀取鎖獲取的過程是這樣的:
在清單7 中有一個對象HoldCounter,這裏暫且不提這是什麼結構和爲何存在這樣一個結構。
接下來根據清單8 咱們來看如何釋放一個讀取鎖。一樣先不理HoldCounter,關鍵的在於for循環裏面,其實就是一個不斷嘗試的CAS操做,直到修改狀態成功。前面說過state的高16位描述的共享鎖(讀取鎖)的數量,因此每次都須要減去2^16,這樣就至關於讀取鎖數量減1。實際上SHARED_UNIT=1<<16。
清單8 讀取鎖釋放過程
- protected final boolean tryReleaseShared(int unused) {
- HoldCounter rh = cachedHoldCounter;
- Thread current = Thread.currentThread();
- if (rh == null || rh.tid != current.getId())
- rh = readHolds.get();
- if (rh.tryDecrement() <= 0)
- throw new IllegalMonitorStateException();
- for (;;) {
- int c = getState();
- int nextc = c - SHARED_UNIT;
- if (compareAndSetState(c, nextc))
- return nextc == 0;
- }
- }
好了,如今回頭看HoldCounter究竟是一個什麼東西。首先咱們能夠看到只有在獲取共享鎖(讀取鎖)的時候加1,也只有在釋放共享鎖的時候減1有做用,而且在釋放鎖的時候拋出了一個IllegalMonitorStateException異常。而咱們知道IllegalMonitorStateException一般描述的是一個線程操做一個不屬於本身的監視器對象的引起的異常。也就是說這裏的意思是一個線程釋放了一個不屬於本身或者不存在的共享鎖。
前面的章節中一再強調,對於共享鎖,其實並非鎖的概念,更像是計數器的概念。一個共享鎖就相對於一次計數器操做,一次獲取共享鎖至關於計數器加1,釋放一個共享鎖就至關於計數器減1。顯然只有線程持有了共享鎖(也就是當前線程攜帶一個計數器,描述本身持有多少個共享鎖或者多重共享鎖),才能釋放一個共享鎖。不然一個沒有獲取共享鎖的線程調用一次釋放操做就會致使讀寫鎖的state(持有鎖的線程數,包括重入數)錯誤。
明白了HoldCounter的做用後咱們就能夠猜到它的做用其實就是當前線程持有共享鎖(讀取鎖)的數量,包括重入的數量。那麼這個數量就必須和線程綁定在一塊兒。
在Java裏面將一個對象和線程綁定在一塊兒,就只有ThreadLocal才能實現了。因此毫無疑問HoldCounter就應該是綁定到線程上的一個計數器。
清單9 線程持有讀取鎖數量的計數器
- static final class HoldCounter {
- int count;
- final long tid = Thread.currentThread().getId();
- int tryDecrement() {
- int c = count;
- if (c > 0)
- count = c - 1;
- return c;
- }
- }
- static final class ThreadLocalHoldCounter
- extends ThreadLocal<HoldCounter> {
- public HoldCounter initialValue() {
- return new HoldCounter();
- }
- }
清單9 描述的是線程持有讀取鎖數量的計數器。能夠看到這裏使用ThreadLocal將HoldCounter綁定到當前線程上,同時HoldCounter也持有線程Id,這樣在釋放鎖的時候才能知道ReadWriteLock裏面緩存的上一個讀取線程(cachedHoldCounter)是不是當前線程。這樣作的好處是能夠減小ThreadLocal.get()的次數,由於這也是一個耗時操做。須要說明的是這樣HoldCounter綁定線程id而不綁定線程對象的緣由是避免HoldCounter和ThreadLocal互相綁定而GC難以釋放它們(儘管GC可以智能的發現這種引用而回收它們,可是這須要必定的代價),因此其實這樣作只是爲了幫助GC快速回收對象而已。
除了readLock()和writeLock()外,Lock對象還容許tryLock(),那麼ReadLock和WriteLock的tryLock()不同。清單10 和清單11 分別描述了讀取鎖的tryLock()和寫入鎖的tryLock()。
讀取鎖tryLock()也就是tryReadLock()成功的條件是:沒有寫入鎖或者寫入鎖是當前線程,而且讀線程共享鎖數量沒有超過65535個。
寫入鎖tryLock()也就是tryWriteLock()成功的條件是: 沒有寫入鎖或者寫入鎖是當前線程,而且嘗試一次修改state成功。
清單10 讀取鎖的tryLock()
- final boolean tryReadLock() {
- Thread current = Thread.currentThread();
- for (;;) {
- int c = getState();
- if (exclusiveCount(c) != 0 &&
- getExclusiveOwnerThread() != current)
- return false;
- if (sharedCount(c) == MAX_COUNT)
- throw new Error("Maximum lock count exceeded");
- if (compareAndSetState(c, c + SHARED_UNIT)) {
- HoldCounter rh = cachedHoldCounter;
- if (rh == null || rh.tid != current.getId())
- cachedHoldCounter = rh = readHolds.get();
- rh.count++;
- return true;
- }
- }
- }
清單11 寫入鎖的tryLock()
- final boolean tryWriteLock() {
- Thread current = Thread.currentThread();
- int c = getState();
- if (c != 0) {
- int w = exclusiveCount(c);
- if (w == 0 ||current != getExclusiveOwnerThread())
- return false;
- if (w == MAX_COUNT)
- throw new Error("Maximum lock count exceeded");
- }
- if (!compareAndSetState(c, c + 1))
- return false;
- setExclusiveOwnerThread(current);
- return true;
- }
4、小結
使用ReentrantReadWriteLock能夠推廣到大部分讀,少許寫的場景,由於讀線程之間沒有競爭,因此比起sychronzied,性能好不少。
若是須要較爲精確的控制緩存,使用ReentrantReadWriteLock倒也不失爲一個方案。
參考內容來源:
ReentrantReadWriteLock http://uule.iteye.com/blog/1549707
深刻淺出 Java Concurrency (13): 鎖機制 part 8 讀寫鎖 (ReentrantReadWriteLock) (1)
http://www.blogjava.net/xylz/archive/2010/07/14/326080.html
深刻淺出 Java Concurrency (14): 鎖機制 part 9 讀寫鎖 (ReentrantReadWriteLock) (2)
http://www.blogjava.net/xylz/archive/2010/07/15/326152.html
高性能鎖ReentrantReadWriteLock
http://jhaij.iteye.com/blog/269656
JDK說明
http://www.cjsdn.net/Doc/JDK60/java/util/concurrent/locks/ReentrantReadWriteLock.html
關於concurrent包 線程池、資源封鎖和隊列、ReentrantReadWriteLock介紹
http://www.oschina.net/question/16_636