Java併發編程:7-ReadWriteLock

前言:java

上一篇咱們瞭解了Lock接口與Condition接口。本篇來看看J.U.C中的ReadWriteLock,再次膜拜一下Doug Lea大神的傑做。面試

面試問題
Q :談談ReadWriteLock的好處?數據庫

1.ReadWriteLock簡介

ReadWriteLock接口是在JDK5提供的,具體的實現類爲ReentrantReadWriteLock,還有一個實現類ReadWriteLockView,是StampedLock的內部類。後邊會有講到。編程

public interface ReadWriteLock {
    Lock readLock();
    Lock writeLock();
}

ReadWriteLock直譯爲讀寫鎖,從接口命名上就能夠看出該工具類用於特定的場景下,前面講過的ReentrantLock和synchronized基本能夠用來解決一切併發問題,但在特定的場景下可能表現的效果不那麼使人滿意,如在讀多寫少的時,大部分線程都在進行讀操做,不多有線程會修改共享數據。但因爲加鎖的特性,致使大量的讀操做進行了沒必要要的鎖競爭,若是能將讀寫的鎖分離,有寫操做的時候,進行讀操做須要加鎖;沒有寫操做的時候,能夠多個線程同時進行讀操做。這樣勢必會提高性能。segmentfault

讀寫鎖即是解決這種場景問題的。讀寫鎖有三個基本原則:緩存

  1. 容許多個線程同時讀共享變量
  2. 只容許一個線程寫共享變量
  3. 若是一個寫線程正在執行,此時禁止讀線程讀共享變量,若是一個線程在讀,一樣也禁止寫共享變量。

2.ReentrantReadWriteLock使用

2.1 鎖降級

ReadWriteLock readWriteLock=new ReentrantReadWriteLock();
    Lock readLock = readWriteLock.readLock()
    Lock writeLock = readWriteLock.writeLock()

能夠看出不管是讀鎖仍是寫鎖都是Lock接口的實現類,那麼上一篇中提到Lock接口的三種加鎖方式均可以使用。併發

//支持中斷的加鎖
void lockInterruptibly() throws InterruptedException;
//支持超時的加鎖
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
//支持非阻塞獲取鎖
boolean tryLock();

Reentrant表明可重入的,ReentrantReadWriteLock支持重入鎖,並且也支持公平鎖和非公平鎖。框架

前面簡單的介紹的讀寫鎖的使用,這裏有一個須要注意的點,就是讀寫鎖的升級和降級。工具

ReentrantReadWriteLock不支持鎖的升級,可是支持鎖的降級。鎖降級就是持有寫鎖去申請讀鎖;鎖升級是持有讀鎖去申請寫鎖,若是出現相似鎖升級的代碼,則會致使線程阻塞,且沒法被喚醒。這點須要注意。post

鎖的升級:

ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();  
    @Test
    //沒法進行鎖的升級,從讀變成寫
    public void test2() throws InterruptedException {
        Thread thread = new Thread(() -> {
            readWriteLock.readLock().lock();
            System.out.println("獲取讀鎖");
            
            readWriteLock.writeLock().lock();
            System.out.println("獲取寫鎖");
            
            readWriteLock.writeLock().unlock();
            System.out.println("釋放寫鎖");
            
            readWriteLock.readLock().unlock();
            System.out.println("釋放讀鎖");
        });
        thread.start();
        thread.join();
    }
    /*Output
        獲取讀鎖
        ----- 發生阻塞----- 必須先釋放讀鎖才能去申請寫鎖,否則會阻塞

鎖的降級:

ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    @Test
    //進行鎖的降級,從寫變成讀
    public void test() throws InterruptedException {
        Thread thread = new Thread(() -> {
            readWriteLock.writeLock().lock();
            System.out.println("獲取寫鎖");
            
            readWriteLock.readLock().lock();
            System.out.println("獲取讀鎖");
            
            readWriteLock.readLock().unlock();
            System.out.println("釋放寫鎖");
            
            readWriteLock.writeLock().unlock();
            System.out.println("釋放讀鎖"); 
        });
        thread.start();
        thread.join();
    }
    /*Output
        獲取寫鎖
        獲取讀鎖
        釋放寫鎖
        釋放讀鎖
    */

鎖降級仍是有不少應用場景的。好比有業務須要先查緩存,發現緩存失效須要從新去數據庫查詢數據並修改緩存,完成修改操做後應該儘快釋放寫鎖,減少鎖的粒度。這樣能讓更多的讀線程儘快訪問到修改後的數據。否則業務邏輯半天執行不完,這期間儘管緩存數據是最新的,可是因爲寫鎖未釋放,其餘線程也沒法進行讀操做。極大的下降了併發性。

所以在完成修改緩存後去拿讀鎖,而後釋放寫鎖,這樣既能保證其餘線程讀取到最新的數據,又能保證當前線程的後續操做使用的數據是最新的。

ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    final Lock r = readWriteLock.readLock();
    final Lock w = readWriteLock.writeLock();
    volatile boolean cacheValid;

    @Test
    public void processCachedData() {
        r.lock();           //獲取讀鎖
        if (!cacheValid){   //緩存失效
            r.unlock();     //先釋放讀鎖,不容許鎖升級
            w.lock();
            try {
                if (!cacheValid){   //再次檢查緩存狀態,
//                  cache= ...
                    cacheValid =true;   //完成緩存更新
                }
                r.lock();   //釋放寫鎖前,降級爲讀鎖
            }finally {
                w.unlock(); //釋放寫鎖
            }
        }
        try{
//            ...     //執行業務邏輯
        }finally {
            r.unlock();
        }
    }

2.2 寫鎖支持條件變量

讀鎖不支持條件變量,若是讀鎖調用newCondition()會拋出UnsupportedOperationException異常;

寫鎖支持條件變量,鎖與條件變量的搭配使用能夠參考上一篇 Lock & Condition

3.ReentrantReadWriteLock原理

ReentrantReadWriteLock內部仍是使用的AQS框架,經過前面的學習咱們知道,在AQS中,經過volatile int state 來表示線程鎖的狀態,ReentrantReadWriteLock有兩把鎖:讀鎖和寫鎖,它們保護的都是同一個資源,如何用一個共享變量來區分寫鎖和讀鎖的狀態呢?答案就是按位拆分。

30-讀寫鎖的state拆分.jpg

因爲state是int類型的變量,在內存中佔用4個字節,也就是32位。將其拆分爲兩部分:高16位和低16位,其中高16位用來表示讀鎖狀態,低16位用來表示寫鎖狀態。這樣就能夠用一個int變量來表示兩種鎖的狀態,低16位寫鎖的加鎖和釋放鎖操做不會發生變化,還是state+1/state-1;但高16位的加鎖和釋放鎖就變成了state + (1<<16)/ state-(1<<16)。

一樣獲取讀鎖和寫鎖的狀態也有所不一樣:

獲取讀鎖:state >>> 16 無符號右移16位,空的地方補0。

獲取寫鎖:state & [(1 << 16) - 1] 至關於state & 0x0000FFFF 至關於把高16位置空,只保留低16位。

因爲讀鎖和寫鎖的狀態值都只佔用16位,因此讀鎖和寫鎖各自可重入鎖的最大數量爲2^16-1。

前面說了持有鎖狀態表示的問題,如今來看看其具體的實現。在此以前先了解一下ReentrantReadWriteLock的類結構。

ReentrantReadWriteLock中有兩個內部類,ReadLock和WriteLock,這兩個類在具體實現Lock接口時,分別調用ReentrantReadWriteLock中實現AQS類的同步組件Sync的共享和獨佔兩種加鎖釋放鎖方式來實現各自的功能。

Sync中實現AQS中獨佔鎖加鎖tryAcquire()和獨佔鎖釋放鎖tryRelease(),以及共享鎖的加鎖tryAcquireShared()和共享鎖的釋放鎖tryReleaseShared()。

下面的內容也是圍繞這四個方法展開。

3.1 寫鎖加鎖

protected final boolean tryAcquire(int acquires) {
            /*
             * Walkthrough
             * 1. 若是讀鎖或者寫鎖其中有一個不爲0,並且鎖的持有者不是線程,
             *      嘗試獲取鎖失敗,該方法返回false。
             * 2. 若是加鎖的數量滿了,返回false。
             * 3. 另外,重複獲取或者入口等待隊列容許的線程纔有資格加鎖,
             *    修改state和鎖的持有者
             */
            Thread current = Thread.currentThread();
            //返回state,高16表明讀鎖的數量,低16表明寫鎖的數量。
            int c = getState();
            //返回獨佔鎖(也就是寫鎖)的數量
            int w = exclusiveCount(c);
            //只有讀鎖和寫鎖有一個數量不爲0,state就不爲0,若是state爲0,
            //那麼沒有一個線程當前在使用讀寫鎖,能夠直接讓當前線程去拿寫鎖
            if (c != 0) {
                /*
                1.寫鎖數量爲0,且獨佔鎖不是被當前線程佔用,那麼必定是讀鎖的數量不爲0,
                  說明讀鎖在使用,嘗試獲取鎖失敗,對應前邊提到讀寫鎖的基本原則:
                  若是一個線程在讀,一樣也禁止寫共享變量。
                2.寫鎖不爲0,可是獨佔鎖的持有線程不是當前線程,說明其餘線程在使用寫鎖
                  獨佔鎖是互斥的,因此也會返回false。
                 */
if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                //判斷一下重入鎖的次數會不會超過2^16-1
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                /* 這裏設置state時沒有使用CAS,緣由很簡單,能走到這裏的必然是
                     已經持有獨佔鎖的線程來重入鎖,其餘狀況沒法經過前邊的狀態判斷。
                */
                setState(c + acquires);
                return true;
            }
            /*state=0才能走到此處,writerShouldBlock()是用來判斷是否須要排隊
            非公平鎖:
                直接返回false,而後使用CAS來修改state,若是修改爲功則說明拿到鎖了
                那麼能夠繼續將獨佔鎖的持有線程設置爲當前線程。
            公平鎖:
                公平鎖會調用hasQueuedPredecessors(),這個方法是判斷入口等待隊列
                中是否有線程在等待鎖。若是沒有線程等待或者等待隊列中排在最前邊的是當前
                線程,那麼能夠繼續進行後邊的CAS操做,不然返回false。
            */
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;    //返回false的後續操做在下邊
            setExclusiveOwnerThread(current);
            return true;
        }

下面附上tryAcquire()的流程圖:
31-寫鎖加鎖流程.jpg

tryAcquire返回false的後續操做。

public final void acquire(int arg) {
                //若是tryAcquire()返回false,會繼續往下執行。
            if (!tryAcquire(arg) &&
                //這行代碼是在AQS中實現的,具體步驟爲把當前線程封裝爲一個獨佔鎖節點,
                //並加入到等待隊列中,而後將當前線程阻塞
                //在阻塞前還會當前線程還會再嘗試一次,是否能獲取到鎖,
                //此次嘗試時,當前線程已經在等待隊列中了,這個是acquireQueued()的內容。
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }

3.2 寫鎖釋放

protected final boolean tryRelease(int releases) {//realeases=1
            //判斷當前線程是否持有鎖
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
             //realease是此次調用要釋放持有的鎖的個數,好比以前重入了3次,
             //getState()返回3,releases=1,則本次釋放1次,後邊還須要再釋放2次
             //只有state爲0時纔算完全釋放鎖,獨佔鎖的全部線程纔會置爲null
            int nextc = getState() - releases;
            //檢查是不是完全釋放鎖
            boolean free = exclusiveCount(nextc) == 0;
             //是完全釋放鎖
            if (free)
                //獨佔鎖的全部線程纔會置爲null
                setExclusiveOwnerThread(null);
             //一樣沒用CAS,由於 if (!isHeldExclusively()),只有當前線程能夠經過
            setState(nextc);
            return free;
        }

3.3 讀鎖加鎖

protected final int tryAcquireShared(int unused) {
          Thread current = Thread.currentThread();
            int c = getState();
            //若是不是當前線程佔用寫鎖則會返回-1,表示共享鎖加鎖失敗
            //若是是當前線程佔用寫鎖,再申請讀鎖,則是被容許的,這是鎖降級的過程。
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            //獲取讀鎖的數量
            int r = sharedCount(c);
            //這裏又是公平鎖和非公平的一個區別。
            if (!readerShouldBlock() &&        //判斷是否須要阻塞
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) { //讀鎖+1
                //r=0,表明讀鎖沒被佔用,下邊的操做對應寫(獨佔)鎖的話則是
                //設置寫鎖的持有者爲當前線程,可是讀鎖是共享的,因此不能這樣設置
                if (r == 0) {
                    /*
                    共享鎖會爲每一個獲取ReadLock的線程建立一個HoldCounter來記錄該線程
                    的線程ID和獲取ReadLock的次數(包括重入)。並將這個HoldCounter對象
                    保存在線程本身的ThreadLocal中。
                    ThreadLocalHoldCounter readHolds;
                    HoldCounter cachedHoldCounter;
                    */
/*
                    static final class ThreadLocalHoldCounter
                        extends ThreadLocal<HoldCounter> {
                        public HoldCounter initialValue() {
                            return new HoldCounter();
                        }
                    }

                    static final class HoldCounter {
                        int count = 0;
                        // Use id, not reference, to avoid garbage retention
                        final long tid = getThreadId(Thread.currentThread());
                  }
                   
                    設計者考慮到有些場景只有一個線程獲取讀鎖,那麼使用ThreadLocal
                    反而會下降性能,因此在ReentrantReadWriteLock中定義了:
                    private transient Thread firstReader = null;
                    private transient int firstReaderHoldCount;
                    來提供只有一個線程獲取讀鎖的性能保障。
                    */
                    firstReader = current;
                    firstReaderHoldCount = 1;
} else if (firstReader == current) {
                        //當前線程重入讀鎖
                        firstReaderHoldCount++;
                    } else {
                        //多個線程來申請讀鎖時會到這一步,默認會從cachedHoldCounter中拿
                        HoldCounter rh = cachedHoldCounter;
                        //若是rh.tid == getThreadId(current),說明這個線程連續兩次來拿讀鎖
                        //若是不等的話,則說明緩存失效了,須要從新從ThreadLocal取出HoldCounter
                        //順便修改緩存
                        if (rh == null || rh.tid != getThreadId(current))
                            cachedHoldCounter = rh = readHolds.get();
                        //上一次拿讀鎖的是別的線程,這個線程是第一次來拿讀鎖
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                    }
                    //獲取讀鎖成功
                    return 1;
                }
                //在readerShouldBlock()返回true時,或者CAS修改失敗時走到這裏
                //在這個方法中會用自旋的方式一直獲取讀鎖,中途寫鎖被其餘線程持有會返回-1
                return fullTryAcquireShared(current);
            }

下面附上tryAcquireShared()的流程圖:

32-讀鎖加鎖流程.jpg

3.4 讀鎖釋放

protected final boolean tryReleaseShared(int unused) {
            Thread current = Thread.currentThread();
            /*
            前邊介紹了firstReader、firstReaderHoldCount
            和cachedHoldCounter、readHolds,這裏就很容易理解了
            減小線程持有讀鎖個數,若是完全釋放讀鎖,那麼還會將
            firstReader或readHolds置空。
            */
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
                if (firstReaderHoldCount == 1)
                    firstReader = null;
                else
                    firstReaderHoldCount--;
            } else {
HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                int count = rh.count;
                if (count <= 1) {
                    readHolds.remove();
                    if (count <= 0)
                        throw unmatchedUnlockException();
                }
                --rh.count;
            }
            //這裏是自旋CAS釋放讀鎖的操做, 由於可能其餘的線程此刻也在進行 release操做
            for (;;) {
                int c = getState();
                //讀鎖是高16位,因此這裏經過c-(1<<16)來釋放讀鎖
                int nextc = c - SHARED_UNIT;
                if (compareAndSetState(c, nextc))
                    /*                    
                    這裏是判斷讀鎖有沒有完全被釋放,若是徹底釋放,後邊則會以傳播的方式
                    喚醒後繼節點中共享類型的節點,成功獲取讀鎖時也會執行喚醒後續共享類型節點
                    */
                    return nextc == 0;
            }
        }

前面提到的四個方法,是Doug Lea大神留給咱們的主要的發揮空間,AQS中其餘核心方法都沒法重寫。若是須要實現自定義的同步組件,那麼對這四個方法必然要深刻理解,而後根據組件特性來實現獨佔鎖或共享鎖。如ReentrantLock是獨佔鎖,因此其內部只實現了tryAcquire()和tryRelease()。所以本篇着重介紹了ReentrantReadWriteLock實現AQS的具體細節,沒有從AQS框架總體上展開,有些地方可能不太好理解,還但願你們多多諒解。

4.總結

在ReentrantReadWriteLock中實現了獨佔鎖和共享鎖兩種方式,讀鎖是共享的,能夠在沒有寫鎖的時候被多個線程同時持有,併發地讀數據;寫鎖是獨佔的,每次只能被一個線程持有,其餘線程要想修改共享數據,則須要排隊等待。得到了讀鎖的線程可以看到前一個釋放的寫鎖所更新的內容。

理論上,讀寫鎖比互斥鎖容許對於共享數據更大程度的併發。與互斥鎖相比,讀寫鎖是否可以提升性能取決於讀寫數據的頻率、讀取和寫入操做的持續時間以及讀線程和寫線程之間的競爭。

ReentrantReadWriteLock不支持鎖的升級,可是支持鎖的降級

ReentrantReadWriteLock的讀鎖不支持條件變量,但寫鎖支持條件變量

Reference

  《Java 併發編程實戰》
  《Java 編程思想(第4版)》
  https://juejin.im/post/5dc229...
  http://www.tianxiaobo.com

感謝閱讀

相關文章
相關標籤/搜索