JUC鎖框架——ReadWriteLock

ReadWriteLock簡單介紹

ReadWriteLock管理一組鎖,一個是隻讀的鎖,一個是寫鎖。讀鎖能夠在沒有寫鎖的時候被多個線程同時持有,寫鎖是獨佔的。相對於互斥鎖而言,ReadWriteLoc容許更高的併發量。java

全部讀寫鎖的實現必須確保寫操做對讀操做的內存影響。換句話說,一個得到了讀鎖的線程必須能看到前一個釋放的寫鎖所更新的內容。緩存

ReadWriteLock接口

public interface ReadWriteLock {
    Lock readLock();//獲取讀鎖
    Lock writeLock();//獲取寫鎖
}

ReentrantReadWriteLock實現類

ReetrantReadWriteLock實現了ReadWriteLock接口並添加了可重入的特性。併發

  1. 鎖的獲取獲取模式
    • 非公平模式(默認):讀鎖和寫鎖的獲取的順序是不肯定的。非公平鎖主張競爭獲取,比公平鎖有更高的吞吐量。
    • 公平模式:線程將會以隊列的順序獲取鎖。噹噹前線程釋放鎖後,等待時間最長的寫/讀鎖線程就會被分配寫/讀鎖;當有寫線程持有寫鎖或者有等待的寫線程時,一個嘗試獲取公平的讀鎖(非重入)的線程就會阻塞。這個線程直到等待時間最長的寫鎖得到鎖後並釋放掉鎖後才能獲取到讀鎖。
  2. 可重入:容許讀鎖可寫鎖可重入。寫鎖能夠得到讀鎖,讀鎖不能得到寫鎖。
  3. 鎖降級:容許寫鎖下降爲讀鎖。
  4. 中斷鎖的獲取:在讀鎖和寫鎖的獲取過程當中支持中斷。
  5. 支持Condition:寫鎖提供Condition實現。
  6. 監控:提供肯定鎖是否被持有等輔助方法

鎖下降的簡單就示例

class ReadWriteLockTest {
    String data;//緩存中的對象
    volatile boolean cacheValid;//緩存是否還有效
    final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

    void processCachedData() throws InterruptedException {
        rwl.readLock().lock();
        if (!cacheValid) {//緩存失效須要再次讀取緩存
            //在讀取緩存前,必須釋放讀鎖
            rwl.readLock().unlock();
            rwl.writeLock().lock();
            try {
                //再次檢查是否須要再次讀取緩存,若是須要則
                if (!cacheValid) {
                    data = Thread.currentThread().getName()+":緩存數據測試,實際開發多是一個對象!";
                    cacheValid = true;
                }
                //在釋放以前,經過獲取讀鎖降級寫鎖
                rwl.readLock().lock();
            } finally {
                rwl.writeLock().unlock(); //釋放寫鎖,持有讀鎖
            }
        }
        try {
            Thread.sleep(1000);
            System.out.println(Thread.currentThread().getName()+"【"+data+"】");
        } finally {
            rwl.readLock().unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ReadWriteLockTest readWriteLockTest = new ReadWriteLockTest();
        for(int i=0;i<10;i++){
           Thread thread = new Thread(()->{
               try {
                   readWriteLockTest.processCachedData();
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
           });
           thread.start();
        }
    }
}

源碼分析

構造方法

public ReentrantReadWriteLock() {
    this(false);//默認爲非公平模式
}

public ReentrantReadWriteLock(boolean fair) {
    //決定了Sync是FairSync仍是NonfairSync。Sync繼承了AbstractQueuedSynchronizer,而Sync是一個抽象類,NonfairSync和FairSync繼承了Sync,並重寫了其中的抽象方法。
    sync = fair ? new FairSync() : new NonfairSync();
    readerLock = new ReadLock(this);
    writerLock = new WriteLock(this);
}

獲取鎖

public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }

Sync分析

FairSync

static final class FairSync extends Sync {
    private static final long serialVersionUID = -2274990926593161451L;
    final boolean writerShouldBlock() {
        return hasQueuedPredecessors();
    }
    final boolean readerShouldBlock() {
        return hasQueuedPredecessors();
    }
}

writerShouldBlock和readerShouldBlock方法都表示當有別的線程也在嘗試獲取鎖時,是否應該阻塞。app

對於公平模式,hasQueuedPredecessors()方法表示前面是否有等待線程。一旦前面有等待線程,那麼爲了遵循公平,當前線程也就應該被掛起。工具

NonfairSync

static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -8159625535654395037L;
        final boolean writerShouldBlock() {
            return false; // 寫線程無需阻塞
        }
        final boolean readerShouldBlock() {
            //apparentlyFirstQueuedIsExclusive在當前線程是寫鎖佔用的線程時,返回true;不然返回false。也就說明,若是當前有一個寫線程正在寫,那麼該讀線程應該阻塞。
            return apparentlyFirstQueuedIsExclusive();
        }
    }

ReentrantReadWriteLock中的state

繼承AQS的類都須要使用state變量表明某種資源,ReentrantReadWriteLock中的state表明了讀鎖的數量和寫鎖的持有與否,整個結構以下: 能夠看到state的高16位表明讀鎖的個數;低16位表明寫鎖的狀態。源碼分析

獲取鎖

讀鎖的獲取

public void lock() {
    sync.acquireShared(1);
}

讀鎖使用的是AQS的共享模式,AQS的acquireShared方法以下:測試

if (tryAcquireShared(arg) < 0)
   doAcquireShared(arg);

當tryAcquireShared()方法小於0時,那麼會執行doAcquireShared方法將該線程加入到等待隊列中。ui

Sync實現了tryAcquireShared方法,以下:this

protected final int tryAcquireShared(int unused) {
            Thread current = Thread.currentThread();
            int c = getState();
            //若是當前有寫線程而且本線程不是寫線程,不符合重入,失敗.
            //在獲取讀鎖時,若是有寫線程,則獲取失敗
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            //獲得讀鎖的個數
            int r = sharedCount(c);
            //若是讀不該該阻塞而且讀鎖的個數小於最大值65535,而且能夠成功更新狀態值,成功
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {
                if (r == 0) {//若是當前讀鎖爲0
                    firstReader = current;//第一個讀線程就是當前線程
                    firstReaderHoldCount = 1;//第一個線程持有讀鎖的個數
                }
                //若是當前線程重入了,記錄firstReaderHoldCount
                else if (firstReader == current) {
                    firstReaderHoldCount++;
                }
                //當前讀線程和第一個讀線程不一樣,記錄每個線程讀的次數
                else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            //不然,循環嘗試
            return fullTryAcquireShared(current);
        }

從上面的代碼以及註釋能夠看到,分爲三步:線程

  1. 若是當前有寫線程而且本線程不是寫線程,那麼失敗,返回-1
  2. 不然,說明當前沒有寫線程或者本線程就是寫線程(可重入),接下來判斷是否應該讀線程阻塞而且讀鎖的個數是否小於最小值,而且CAS成功使讀鎖+1,成功,返回1。其他的操做主要是用於計數的
  3. 若是2中失敗了,失敗的緣由有三,第一是應該讀線程應該阻塞;第二是由於讀鎖達到了上線;第三是由於CAS失敗,有其餘線程在併發更新state,那麼會調動fullTryAcquireShared方法。

fullTryAcquiredShared方法

final int fullTryAcquireShared(Thread current) {
            HoldCounter rh = null;
            for (;;) {
                int c = getState();
                //一旦有別的線程得到了寫鎖,而且得到寫鎖的線程不是本線程,返回-1,失敗
                if (exclusiveCount(c) != 0) {
                    if (getExclusiveOwnerThread() != current)
                        return -1;
                }
                //若是讀線程須要阻塞
                else if (readerShouldBlock()) {
                    // Make sure we're not acquiring read lock reentrantly
                    if (firstReader == current) {
                        // assert firstReaderHoldCount > 0;
                    }
                    //說明有別的讀線程佔有了鎖
                    else {
                        if (rh == null) {
                            rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current)) {
                                rh = readHolds.get();
                                if (rh.count == 0)
                                    readHolds.remove();
                            }
                        }
                        if (rh.count == 0)
                            return -1;
                    }
                }
                //若是讀鎖達到了最大值,拋出異常
                if (sharedCount(c) == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                //若是成功更改狀態,成功返回
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                    if (sharedCount(c) == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        if (rh == null)
                            rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                        cachedHoldCounter = rh; // cache for release
                    }
                    return 1;
                }
            }
        }

寫鎖的獲取

public void lock() {
    sync.acquire(1);
}

AQS的acquire方法以下:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

從上面能夠看到,寫鎖使用的是AQS的獨佔模式。首先嚐試獲取鎖,若是獲取失敗,那麼將會把該線程加入到等待隊列中。

Sync實現了tryAcquire方法用於嘗試獲取一把鎖,以下:

protected final boolean tryAcquire(int acquires) {
             //獲得調用lock方法的當前線程
            Thread current = Thread.currentThread();
            int c = getState();
            //獲得寫鎖的個數
            int w = exclusiveCount(c);
            //若是當前有寫鎖或者讀鎖.(對於讀鎖而言,若是當前寫線程能夠進行寫操做,那麼讀線程讀到的數據可能有誤)
            if (c != 0) {
                // 若是寫鎖爲0或者當前線程不是獨佔線程(不符合重入),返回false
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                //若是寫鎖的個數超過了最大值(65535),拋出異常
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // 寫鎖重入,返回true
                setState(c + acquires);
                return true;
            }
            //若是當前沒有寫鎖或者讀鎖,若是寫線程應該阻塞或者CAS失敗,返回false
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            //不然將當前線程置爲得到寫鎖的線程,返回true
            setExclusiveOwnerThread(current);
            return true;
        }

釋放鎖

讀鎖的釋放

ReadLock的unlock方法以下:

public void unlock() {
     sync.releaseShared(1);
 }

調用了Sync的releaseShared方法,該方法在AQS中提供,以下:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

調用tryReleaseShared方法嘗試釋放鎖,若是釋放成功,調用doReleaseShared嘗試喚醒下一個節點。

AQS的子類須要實現tryReleaseShared方法,Sync中的實現以下:

protected final boolean tryReleaseShared(int unused) {
    //獲得調用unlock的線程
    Thread current = Thread.currentThread();
    //若是是第一個得到讀鎖的線程
    if (firstReader == current) {
        // assert firstReaderHoldCount > 0;
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    }
    //不然,是HoldCounter中計數-1
    else {
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;
    }
    //死循環
    for (;;) {
        int c = getState();
        //釋放一把讀鎖
        int nextc = c - SHARED_UNIT;
        //若是CAS更新狀態成功,返回讀鎖是否等於0;失敗的話,則重試
        if (compareAndSetState(c, nextc))
            //釋放讀鎖對讀線程沒有影響,可是可能會使等待的寫線程解除掛起開始運行。因此,一旦沒有鎖了,就返回true,不然false;返回true後,那麼則須要釋放等待隊列中的線程,這時讀線程和寫線程都有可能再得到鎖。
            return nextc == 0;
    }
}

寫鎖的釋放

WriteLock的unlock方法以下:

public void unlock() {
    sync.release(1);
}

Sync的release方法使用的AQS中的,以下:

public final boolean release(int arg) {
    if (tryRelease(arg)) {//嘗試釋放鎖
        Node h = head;
        if (h != null && h.waitStatus != 0)//若是等待隊列中有線程再等待
            unparkSuccessor(h);//將下一個線程解除掛起。
        return true;
    }
    return false;
}

Sync須要實現tryRelease方法,以下:

protected final boolean tryRelease(int releases) {
    //若是沒有線程持有寫鎖,可是仍要釋放,拋出異常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    int nextc = getState() - releases;
    boolean free = exclusiveCount(nextc) == 0;
    //若是沒有寫鎖了,那麼將AQS的線程置爲null
    if (free)
        setExclusiveOwnerThread(null);
    //更新狀態
    setState(nextc);
    return free;//此處返回當且僅當free爲0時返回,若是當前是寫鎖被佔有了,只有當寫鎖的數據降爲0時才認爲釋放成功;不然失敗。由於只要有寫鎖,那麼除了佔有寫鎖的那個線程,其餘線程即不能夠得到讀鎖,也不能得到寫鎖
}

getOwner()

getOwner方法用於返回當前得到寫鎖的線程,若是沒有線程佔有寫鎖,那麼返回null。實現以下:

protected Thread getOwner() {
    return sync.getOwner();
}

能夠看到直接調用了Sync的getOwner方法,下面是Sync的getOwner方法:

final Thread getOwner() {
  // Must read state before owner to ensure memory consistency
  //若是獨佔鎖的個數爲0,說明沒有線程佔有寫鎖,那麼返回null;不然返回佔有寫鎖的線程。
  return ((exclusiveCount(getState()) == 0) ?null :getExclusiveOwnerThread());
}

getReadLockCount()

getReadLockCount()方法用於返回讀鎖的個數,實現以下:

public int getReadLockCount() {
    return sync.getReadLockCount();
}

Sync的實現以下:

final int getReadLockCount() {
    return sharedCount(getState());
}
//要想獲得讀鎖的個數,就是看AQS的state的高16位。這和前面講過的同樣,高16位表示讀鎖的個數,低16位表示寫鎖的個數。
static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }

getReadHoldCount()

getReadHoldCount()方法用於返回當前線程所持有的讀鎖的個數,若是當前線程沒有持有讀鎖,則返回0。直接看Sync的實現便可:

final int getReadHoldCount() {
   //若是沒有讀鎖,天然每一個線程都是返回0
   if (getReadLockCount() == 0)
       return 0;

   //獲得當前線程
   Thread current = Thread.currentThread();
   //若是當前線程是第一個讀線程,返回firstReaderHoldCount參數
   if (firstReader == current)
       return firstReaderHoldCount;
   //若是當前線程不是第一個讀線程,獲得HoldCounter,返回其中的count
   HoldCounter rh = cachedHoldCounter;
   //若是緩存的HoldCounter不爲null而且是當前線程的HoldCounter,直接返回count
   if (rh != null && rh.tid == getThreadId(current))
       return rh.count;

   //若是緩存的HoldCounter不是當前線程的HoldCounter,那麼從ThreadLocal中獲得本線程的HoldCounter,返回計數
    int count = readHolds.get().count;
    //若是本線程持有的讀鎖爲0,從ThreadLocal中移除
    if (count == 0) readHolds.remove();
    return count;
}

從上面的代碼中,能夠看到兩個熟悉的變量,firstReader和HoldCounter類型。這兩個變量在讀鎖的獲取中接觸過,前面沒有細說,這裏細說一下。HoldCounter類的實現以下:

static final class HoldCounter {
    int count = 0;
    // Use id, not reference, to avoid garbage retention
    final long tid = getThreadId(Thread.currentThread());
}

readHolds是ThreadLocalHoldCounter類,定義以下:

static final class ThreadLocalHoldCounter
      extends ThreadLocal<HoldCounter> {
      public HoldCounter initialValue() {
          return new HoldCounter();
      }
  }

能夠看到,readHolds存儲了每個線程的HoldCounter,而HoldCounter中的count變量就是用來記錄線程得到的寫鎖的個數。因此能夠得出結論:Sync維持總的讀鎖的個數,在state的高16位;因爲讀線程能夠同時存在,因此每一個線程還保存了得到的讀鎖的個數,這個是經過HoldCounter來保存的。 除此以外,對於第一個讀線程有特殊的處理,Sync中有以下兩個變量:

private transient Thread firstReader = null;//第一個獲得讀鎖的線程
private transient int firstReaderHoldCount;//第一個線程得到的寫鎖

其他獲取到讀鎖的線程的信息保存在HoldCounter中。

看完了HoldCounter和firstReader,再來看一下getReadLockCount的實現,主要有三步:

  1. 當前沒有讀鎖,那麼天然每個線程得到的讀鎖都是0;
  2. 若是當前線程是第一個獲取到讀鎖的線程,那麼返回firstReadHoldCount;
  3. 若是當前線程不是第一個獲取到讀鎖的線程,獲得該線程的HoldCounter,而後返回其count字段。若是count字段爲0,說明該線程沒有佔有讀鎖,那麼從readHolds中移除。獲取HoldCounter分爲兩步,第一步是與cachedHoldCounter比較,若是不是,則從readHolds中獲取。

getWriteLockCount()

getWriteLockCount()方法返回寫鎖的個數,Sync的實現以下:

final int getWriteHoldCount() {
    return isHeldExclusively() ? exclusiveCount(getState()) : 0;
}

能夠看到若是沒有線程持有寫鎖,那麼返回0;不然返回AQS的state的低16位。

總結

當分析ReentranctReadWriteLock時,或者說分析內部使用AQS實現的工具類時,須要明白的就是AQS的state表明的是什麼。ReentrantLockReadWriteLock中的state同時表示寫鎖和讀鎖的個數。爲了實現這種功能,state的高16位表示讀鎖的個數,低16位表示寫鎖的個數。AQS有兩種模式:共享模式和獨佔模式,讀寫鎖的實現中,讀鎖使用共享模式;寫鎖使用獨佔模式;另一點須要記住的即便,當有讀鎖時,寫鎖就不能得到;而當有寫鎖時,除了得到寫鎖的這個線程能夠得到讀鎖外,其餘線程不能得到讀鎖。

相關文章
相關標籤/搜索