【Java併發編程實戰】-----「J.U.C」:ReentrantReadWriteLock

ReentrantLock實現了標準的互斥操做,也就是說在某一時刻只有有一個線程持有鎖。ReentrantLock採用這種獨佔的保守鎖直接,在必定程度上減低了吞吐量。在這種狀況下任何的「讀/讀」、「讀/寫」、「寫/寫」操做都不能同時發生。然而在實際的場景中咱們就會遇到這種狀況:有些資源併發的訪問中,它大部分時間都是執行讀操做,寫操做比較少,可是讀操做並不影響數據的一致性,若是在進行讀操做時採用獨佔的鎖機制,這樣勢必會大大下降吞吐量。因此若是可以作到讀寫分離,那就很是完美了。java

ReadWriteLock, 維護了一對相關的鎖,一個用於只讀操做,另外一個用於寫入操做。只要沒有 writer,讀取鎖能夠由多個 reader 線程同時保持。寫入鎖是獨佔的。對於ReadWriteLock而言,一個資源可以被多個讀線程訪問,或者被一個寫線程訪問,可是不能同時存在讀寫線程。也就是說讀寫鎖使用的場合是一個共享資源被大量讀取操做,而只有少許的寫操做(修改數據)。以下:編程

public interface ReadWriteLock {
    Lock readLock();
    Lock writeLock();
}

ReadWriteLock爲一個接口,他定義了兩個方法readLock、writeLock,從方法名咱們就能夠看出這兩個方法是幹嗎用的。ReentrantReadWriteLock做爲ReadWriteLock的實現類,在API文檔中詳細介紹了它的特性。緩存

(一) 公平性多線程

      1)、 非公平鎖(默認) 這個和獨佔鎖的非公平性同樣,因爲讀線程之間沒有鎖競爭,因此讀操做沒有公平性和非公平性,寫操做時,因爲寫操做可能當即獲取到鎖,因此會推遲一個或多個讀操做或者寫操做。所以非公平鎖的吞吐量要高於公平鎖。併發

       2)、 公平鎖 利用AQS的CLH隊列,釋放當前保持的鎖(讀鎖或者寫鎖)時,優先爲等待時間最長的那個寫線程分配寫入鎖,當前前提是寫線程的等待時間要比全部讀線程的等待時間要長。一樣一個線程持有寫入鎖或者有一個寫線程已經在等待了,那麼試圖獲取公平鎖的(非重入)全部線程(包括讀寫線程)都將被阻塞,直到最早的寫線程釋放鎖。若是讀線程的等待時間比寫線程的等待時間還有長,那麼一旦上一個寫線程釋放鎖,這一組讀線程將獲取鎖。dom

(二) 重入性ide

      1)、 讀寫鎖容許讀線程和寫線程按照請求鎖的順序從新獲取讀取鎖或者寫入鎖。固然了只有寫線程釋放了鎖,讀線程才能獲取重入鎖。ui

      2)、 寫線程獲取寫入鎖後能夠再次獲取讀取鎖,可是讀線程獲取讀取鎖後卻不能獲取寫入鎖。this

      3)、 另外讀寫鎖最多支持65535個遞歸寫入鎖和65535個遞歸讀取鎖。(爲什麼是65535後面介紹)spa

(三) 鎖降級

      1)、 寫線程獲取寫入鎖後能夠獲取讀取鎖,而後釋放寫入鎖,這樣就從寫入鎖變成了讀取鎖,從而實現鎖降級的特性。

(四) 鎖升級

      1)、 讀取鎖是不能直接升級爲寫入鎖的。由於獲取一個寫入鎖須要釋放全部讀取鎖,因此若是有兩個讀取鎖視圖獲取寫入鎖而都不釋放讀取鎖時就會發生死鎖。

(五) 鎖獲取中斷

      1)、 讀取鎖和寫入鎖都支持獲取鎖期間被中斷。這個和獨佔鎖一致。

(六) 條件變量

      1)、 寫入鎖提供了條件變量(Condition)的支持,這個和獨佔鎖一致,可是讀取鎖卻不容許獲取條件變量,將獲得一個UnsupportedOperationException異常。

(七) 重入數

      1)、 讀取鎖和寫入鎖的數量最大分別只能是65535(包括重入數)。

(八) 監測

      1)、 此類支持一些肯定是保持鎖仍是爭用鎖的方法。這些方法設計用於監視系統狀態,而不是同步控制。

ReentrantReadWriteLock與ReentrantLock同樣,其鎖主體依然是Sync,它的讀鎖、寫鎖都是依靠Sync來實現的。因此ReentrantReadWriteLock實際上只有一個鎖,只是在獲取讀取鎖和寫入鎖的方式上不一    樣而已,它的讀寫鎖其實就是兩個類:ReadLock、writeLock,這兩個類都是lock實現。

/** 讀鎖 */
    private final ReentrantReadWriteLock.ReadLock readerLock;
    
    /** 寫鎖 */
    private final ReentrantReadWriteLock.WriteLock writerLock;
    
    
    final Sync sync;

    /** 使用默認(非公平)的排序屬性建立一個新的 ReentrantReadWriteLock */
    public ReentrantReadWriteLock() {
        this(false);
    }

    /** 使用給定的公平策略建立一個新的 ReentrantReadWriteLock */
    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }
    
    /** 返回用於寫入操做的鎖 */
    public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
    /** 返回用於讀取操做的鎖 */
    public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }
    
    public static class WriteLock implements Lock, java.io.Serializable{
        public void lock() {
            //獨佔鎖
            sync.acquire(1);
        }
        /**
         * 省略其他源代碼
         */
    }
    
    public static class ReadLock implements Lock, java.io.Serializable {
        public void lock() {
            //共享鎖
            sync.acquireShared(1);
        }
        /**
         * 省略其他源代碼
         */
    }

從上面的源代碼咱們能夠看到WriteLock就是一個獨佔鎖,readLock是一個共享鎖,他們內部都是使用AQS的acquire、release來進行操做的。可是仍是存在一些區別的。關於獨佔鎖、共享鎖,請關注前面的博客:

【Java併發編程實戰】—–「J.U.C」:ReentrantLock之二lock方法分析

【Java併發編程實戰】—–「J.U.C」:ReentrantLock之三unlock方法分析

【Java併發編程實戰】-----「J.U.C」:Semaphore

下面LZ就ReadLock、WriteLock的獲取鎖(lock)、釋放鎖(release)進行分析。

WriteLock

lock()

                  

public void lock() {
        sync.acquire(1);
    }

與ReentrantLock同樣,調用AQS的acquire():

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

第一步,寫鎖調用tryAcquire方法,該方法與ReentrantLock中的tryAcquire方法略有不一樣:

protected final boolean tryAcquire(int acquires) {
        //當前線程
        Thread current = Thread.currentThread();
        //當前鎖個數
        int c = getState();
        //寫鎖個數
        int w = exclusiveCount(c);
        
        //當前鎖個數 != 0(是否已經有線程持有鎖),線程重入
        if (c != 0) {
            //w == 0,表示寫線程數爲0
            //或者獨佔鎖不是當前線程,返回false
            if (w == 0 || current != getExclusiveOwnerThread())
                return false;
            
            //超出最大範圍(65535)
            if (w + exclusiveCount(acquires) > MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            //設置鎖的線程數量
            setState(c + acquires);
            return true;
        }
        
        //是否阻塞
        if (writerShouldBlock() ||
            !compareAndSetState(c, c + acquires))
            return false;
        
        //設置鎖爲當前線程全部
        setExclusiveOwnerThread(current);
        return true;
    }

在tryAcquire()中有一個段代碼

int w = exclusiveCount(c);

該段代碼主要是獲取線程的數量的,在前面的特性裏面有講過讀取鎖和寫入鎖的數量最大分別只能是65535(包括重入數)。爲什麼是65535呢?在前面LZ也提到過獨佔鎖ReentrantLock中有一個state,共享鎖中也有一個state,其中獨佔鎖中的state爲0或者1若是有重入,則表示重入的次數,共享鎖中表示的持有鎖的數量。而在ReadWriteLock中則不一樣,因爲ReadWriteLock中存在兩個鎖,他們之間有聯繫可是也有差別,因此須要有兩個state來分別表示他們。因而ReentrantReadWriteLock就將state一分二位,高16位表示共享鎖的數量,低16位表示獨佔鎖的數量。2^16 – 1 = 65535。這就是前面提過的爲何讀取鎖和寫入鎖的數量最大分別只能是65535。

·   static final int SHARED_SHIFT   = 16;
    static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
    static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
    
    /** 返回共享鎖持有線程的數量 **/
    static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
    /** 返回獨佔鎖持有線程的數量 **/
    static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

這段代碼能夠清晰表達計算寫鎖、讀書持有線程的數量。

在上段tryAcquire方法的源代碼中,主要流程以下:

一、首先獲取c、w。而後判斷是否已經有線程持有寫鎖(c != 0),若是持有,則線程進行重入。若w == 0(寫入鎖==0)或者 current != getExclusiveOwnerThread()(鎖的持有者不是當前線程),則返回false。若是寫入鎖的數量超出最大範圍(65535),則拋出error。

二、若是當且寫線程數爲0(那麼讀線程也應該爲0,由於上面已經處理c!=0的狀況),而且當前線程須要阻塞那麼就返回失敗;若是經過CAS增長寫線程數失敗也返回失敗。

三、當c ==0或者c>0,w >0,則設置鎖的持有則爲當前線程。

unlock()

public void unlock() {
            sync.release(1);
        }

unlock()調用Sync的release():

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

在release()中首先調用tryRelease方法進行嘗試釋放鎖:

protected final boolean tryRelease(int releases) {
        //若鎖的持有者不是當前線程,拋出異常
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        //寫鎖的新線程數
        int nextc = getState() - releases;
        //若寫鎖的新線程數爲0,則將鎖的持有者設置爲null
        boolean free = exclusiveCount(nextc) == 0;
        if (free)
            setExclusiveOwnerThread(null);
        //設置寫鎖的新線程數
        setState(nextc);
        return free;
    }

寫鎖的釋放過程仍是相對而言比較簡單的:首先查看當前線程是否爲寫鎖的持有者,若是不是拋出異常。而後檢查釋放後寫鎖的線程數是否爲0,若是爲0則表示寫鎖空閒了,釋放鎖資源將鎖的持有線程設置爲null,不然釋放僅僅只是一次重入鎖而已,並不能將寫鎖的線程清空。

因爲寫鎖與獨佔鎖存在很大的類似之處,因此相同的地方,LZ再也不闡述,更多請查閱:

【Java併發編程實戰】—–「J.U.C」:ReentrantLock之二lock方法分析

【Java併發編程實戰】—–「J.U.C」:ReentrantLock之三unlock方法分析

ReadLock

lock()

讀鎖的內在機制就是共享鎖:

public void lock() {
            sync.acquireShared(1);
        }

lock方法內部調用Sync的acquireShared():

public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

對於tryAquireShared():

protected final int tryAcquireShared(int unused) {
        Thread current = Thread.currentThread();
        //鎖的持有線程數
        int c = getState();
        /*
         * 若是寫鎖線程數 != 0 ,且獨佔鎖不是當前線程則返回失敗,由於存在鎖降級
         */
        if (exclusiveCount(c) != 0 &&
            getExclusiveOwnerThread() != current)
            return -1;
        //讀鎖線程數
        int r = sharedCount(c);
        /*
         * readerShouldBlock():讀鎖是否須要等待(公平鎖原則)
         * r < MAX_COUNT:持有線程小於最大數(65535)
         * compareAndSetState(c, c + SHARED_UNIT):設置讀取鎖狀態
         */
        if (!readerShouldBlock() && r < MAX_COUNT &&
            compareAndSetState(c, c + SHARED_UNIT)) {
            /*
             * holdCount部分後面講解
             */
            if (r == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;        //
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != current.getId())
                    cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
            }
            return 1;
        }
        return fullTryAcquireShared(current);
    }

讀鎖獲取鎖的過程比寫鎖稍微複雜些:

一、寫鎖的線程數C!=0且寫鎖的線程持有者不是當前線程,返回-1。由於存在鎖降級,寫線程獲取寫入鎖後能夠獲取讀取鎖。

二、依據公平性原則,判斷讀鎖是否須要阻塞,讀鎖持有線程數小於最大值(65535),且設置鎖狀態成功,執行如下代碼(對於HoldCounter下面再闡述),並返回1。若是不知足改條件,執行fullTryAcquireShared():(HoldCounter部分後面講解)

final int fullTryAcquireShared(Thread current) {
         HoldCounter rh = null;
         for (;;) {
             //鎖的線程持有數
             int c = getState();
             //若是寫鎖的線程持有數 != 0 且鎖的持有者不是當前線程,返回-1
             if (exclusiveCount(c) != 0) {
                 if (getExclusiveOwnerThread() != current)
                     return -1;
             } 
             //若讀鎖須要阻塞
             else if (readerShouldBlock()) {
                 //若隊列的頭部是當前線程
                 if (firstReader == current) {
                 } 
                 else {   //下面講解
                     if (rh == null) {
                         rh = cachedHoldCounter;
                         if (rh == null || rh.tid != current.getId()) {
                             rh = readHolds.get();
                             if (rh.count == 0)
                                 readHolds.remove();
                         }
                     }
                     if (rh.count == 0)
                         return -1;
                 }
             }
             //讀鎖的線程數到達最大值:65536,拋出異常
             if (sharedCount(c) == MAX_COUNT)
                 throw new Error("Maximum lock count exceeded");
             //設置鎖的狀態成功
             if (compareAndSetState(c, c + SHARED_UNIT)) {
                 //
                 if (sharedCount(c) == 0) {
                     firstReader = current;
                     firstReaderHoldCount = 1;
                 }//
                 else if (firstReader == current) {
                     firstReaderHoldCount++;
                 }//下面講解
                 else {
                     if (rh == null)
                         rh = cachedHoldCounter;
                     if (rh == null || rh.tid != current.getId())
                         rh = readHolds.get();
                     else if (rh.count == 0)
                         readHolds.set(rh);
                     rh.count++;
                     cachedHoldCounter = rh;
                 }
                 return 1;
             }
         }
     }

 

unlock()

public  void unlock() {
            sync.releaseShared(1);
        }

unlock調用releaseShared()方法,releaseShared()是AQS中的方法,以下:

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

tryReleaseShared是ReentrantReadWriteLock中的方法:

protected final boolean tryReleaseShared(int unused) {
        //當前線程
        Thread current = Thread.currentThread();
        /*
         * HoldCounter部分後面闡述
         */
        if (firstReader == current) {
            if (firstReaderHoldCount == 1)
                firstReader = null;
            else
                firstReaderHoldCount--;
        } else {
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != current.getId())
                rh = readHolds.get();
            int count = rh.count;
            if (count <= 1) {
                readHolds.remove();
                if (count <= 0)
                    throw unmatchedUnlockException();
            }
            --rh.count;
        }
        //不斷循環,不斷嘗試CAS操做
        for (;;) {
            int c = getState();
            int nextc = c - SHARED_UNIT;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }

在這裏一樣忽略HoldCounter,其實在該方法中最關鍵的部分在於for(;;)部分,該部分其實就是一個不斷嘗試的CAS過程,直到修狀態成功。

在讀鎖的獲取、釋放過程當中,老是會有一個對象存在着,同時該對象在獲取線程獲取讀鎖是+1,釋放讀鎖時-1,該對象就是HoldCounter。

HoldCounter

要明白HoldCounter就要先明白讀鎖。前面提過讀鎖的內在實現機制就是共享鎖,對於共享鎖其實咱們能夠稍微的認爲它不是一個鎖的概念,它更加像一個計數器的概念。一次共享鎖操做就至關於一次計數器的操做,獲取共享鎖計數器+1,釋放共享鎖計數器-1。只有當線程獲取共享鎖後才能對共享鎖進行釋放、重入操做。因此HoldCounter的做用就是當前線程持有共享鎖的數量,這個數量必需要與線程綁定在一塊兒,不然操做其餘線程鎖就會拋出異常。

先看讀鎖獲取鎖的部分:

if (r == 0) {        //r == 0,表示第一個讀鎖線程,第一個讀鎖firstRead是不會加入到readHolds中
        firstReader = current;
        firstReaderHoldCount = 1;
    } else if (firstReader == current) {    //第一個讀鎖線程重入
        firstReaderHoldCount++;    
    } else {    //非firstReader計數
        HoldCounter rh = cachedHoldCounter;        //readHoldCounter緩存
        //rh == null 或者 rh.tid != current.getId(),須要獲取rh
        if (rh == null || rh.tid != current.getId())    
            cachedHoldCounter = rh = readHolds.get();
        else if (rh.count == 0)
            readHolds.set(rh);        //加入到readHolds中
        rh.count++;        //計數+1
    }

這裏爲何要搞一個firstRead、firstReaderHoldCount呢?而不是直接使用else那段代碼?這是爲了一個效率問題,firstReader是不會放入到readHolds中的,若是讀鎖僅有一個的狀況下就會避免查找readHolds。可能就看這個代碼還不是很理解HoldCounter。咱們先看firstReader、firstReaderHoldCount的定義:

private transient Thread firstReader = null;
private transient int firstReaderHoldCount;

這兩個變量比較簡單,一個表示線程,固然該線程是一個特殊的線程,一個是firstReader的重入計數。

HoldCounter的定義:

static final class HoldCounter {
            int count = 0;
            final long tid = Thread.currentThread().getId();
        }

在HoldCounter中僅有count和tid兩個變量,其中count表明着計數器,tid是線程的id。可是若是要將一個對象和線程綁定起來僅記錄tid確定不夠的,並且HoldCounter根本不能起到綁定對象的做用,只是記錄線程tid而已。

誠然,在java中,咱們知道若是要將一個線程和對象綁定在一塊兒只有ThreadLocal才能實現。因此以下:

static final class ThreadLocalHoldCounter
            extends ThreadLocal<HoldCounter> {
            public HoldCounter initialValue() {
                return new HoldCounter();
            }
        }
ThreadLocalHoldCounter繼承ThreadLocal,而且重寫了initialValue方法。

故而,HoldCounter應該就是綁定線程上的一個計數器,而ThradLocalHoldCounter則是線程綁定的ThreadLocal。從上面咱們能夠看到ThreadLocal將HoldCounter綁定到當前線程上,同時HoldCounter也持有線程Id,這樣在釋放鎖的時候才能知道ReadWriteLock裏面緩存的上一個讀取線程(cachedHoldCounter)是不是當前線程。這樣作的好處是能夠減小ThreadLocal.get()的次數,由於這也是一個耗時操做。須要說明的是這樣HoldCounter綁定線程id而不綁定線程對象的緣由是避免HoldCounter和ThreadLocal互相綁定而GC難以釋放它們(儘管GC可以智能的發現這種引用而回收它們,可是這須要必定的代價),因此其實這樣作只是爲了幫助GC快速回收對象而已(引自[1])。

示例

public class Reader implements Runnable{
    
    private PricesInfo pricesInfo;
    
    public Reader(PricesInfo pricesInfo){
        this.pricesInfo = pricesInfo;
    }

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            System.out.println(Thread.currentThread().getName() + "--Price 1:" + pricesInfo.getPrice1());
            System.out.println(Thread.currentThread().getName() + "--Price 1:" + pricesInfo.getPrice2());
        }
    }

}

Writer

public class Writer implements Runnable{
    private PricesInfo pricesInfo;

    public Writer(PricesInfo pricesInfo){
        this.pricesInfo = pricesInfo;
    }
    
    @Override
    public void run() {
        for (int i=0; i<3; i++) {
            System.out.printf("Writer: Attempt to modify the prices.\n");
            pricesInfo.setPrices(Math.random()*10, Math.random()*8);
            System.out.printf("Writer: Prices have been modified.\n");
            try {
                Thread.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        
    }
}

PriceInfo

public class PricesInfo {
    private double price1;
    private double price2;
    
    private ReadWriteLock  lock;
    
    public PricesInfo(){
        price1 = 1.0;
        price2 = 2.0;
        
        lock = new ReentrantReadWriteLock();
    }
    
    public double getPrice1(){
        lock.readLock().lock();
        double value = price1;
        lock.readLock().unlock();
        return value;
    }
    
    public double getPrice2(){
        lock.readLock().lock();
        double value = price2;
        lock.readLock().unlock();
        return value;
    }
    
    public void setPrices(double price1, double price2){
        lock.writeLock().lock(); 
        this.price1 = price1;
        this.price2 = price2;
        lock.writeLock().unlock();
    }
}

Test:

public class Test {
    public static void main(String[] args) {
        PricesInfo pricesInfo = new PricesInfo();
        
        Reader[] readers = new Reader[5];
        Thread[] readerThread = new Thread[5];
        for (int i=0; i<5; i++){
            readers[i]=new Reader(pricesInfo);
            readerThread[i]=new Thread(readers[i]);
        }
        
        Writer writer=new Writer(pricesInfo);
        Thread threadWriter=new Thread(writer);
        
        for (int i=0; i<5; i++){
            readerThread[i].start();
        }
        threadWriter.start();
    }
}

 

參考資料:

一、Java多線程(十)之ReentrantReadWriteLock深刻分析

二、JUC 可重入 讀寫鎖 ReentrantReadWriteLock

相關文章
相關標籤/搜索