j.u.c系列(07)---之讀寫鎖:ReentrantReadWriteLock

寫在前面

  重入鎖ReentrantLock是排他鎖,排他鎖在同一時刻僅有一個線程能夠進行訪問,可是在大多數場景下,大部分時間都是提供讀服務,而寫服務佔有的時間較少。然而讀服務不存在數據競爭問題,若是一個線程在讀時禁止其餘線程讀勢必會致使性能下降。因此就提供了讀寫鎖。java

  讀寫鎖維護着一對鎖,一個讀鎖和一個寫鎖。經過分離讀鎖和寫鎖,使得併發性比通常的排他鎖有了較大的提高:在同一時間能夠容許多個讀線程同時訪問,可是在寫線程訪問時,全部讀線程和寫線程都會被阻塞。redis

讀寫鎖的主要特性:數據庫

  1. 公平性:支持公平性和非公平性。
  2. 重入性:支持重入。讀寫鎖最多支持65535個遞歸寫入鎖和65535個遞歸讀取鎖。
  3. 鎖降級:遵循獲取寫鎖、獲取讀鎖在釋放寫鎖的次序,寫鎖可以降級成爲讀鎖

  

  讀寫鎖ReentrantReadWriteLock實現接口ReadWriteLock,該接口維護了一對相關的鎖,一個用於只讀操做,另外一個用於寫入操做。只要沒有 writer,讀取鎖能夠由多個 reader 線程同時保持。寫入鎖是獨佔的。數組

public interface ReadWriteLock {
    Lock readLock();
    Lock writeLock();
}

   ReadWriteLock定義了兩個方法。readLock()返回用於讀操做的鎖,writeLock()返回用於寫操做的鎖。ReentrantReadWriteLock定義以下:緩存

 

    /** 內部類  讀鎖 */
    private final ReentrantReadWriteLock.ReadLock readerLock;
    /** 內部類  寫鎖 */
    private final ReentrantReadWriteLock.WriteLock writerLock;

    final Sync sync;

    /** 使用默認(非公平)的排序屬性建立一個新的 ReentrantReadWriteLock */
    public ReentrantReadWriteLock() {
        this(false);
    }

    /** 使用給定的公平策略建立一個新的 ReentrantReadWriteLock */
    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }

    /** 返回用於寫入操做的鎖 */
    public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
    /** 返回用於讀取操做的鎖 */
    public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }

    abstract static class Sync extends AbstractQueuedSynchronizer {
        /**
         * 省略其他源代碼
         */
    }
    public static class WriteLock implements Lock, java.io.Serializable{
        /**
         * 省略其他源代碼
         */
    }

    public static class ReadLock implements Lock, java.io.Serializable {
        /**
         * 省略其他源代碼
         */
    }

  ReentrantReadWriteLock與ReentrantLock同樣,其鎖主體依然是Sync,它的讀鎖、寫鎖都是依靠Sync來實現的。因此ReentrantReadWriteLock實際上只有一個鎖,只是在獲取讀取鎖和寫入鎖的方式上不同而已,它的讀寫鎖其實就是兩個類:ReadLock、writeLock,這兩個類都是lock實現。多線程

  在ReentrantLock中使用一個int類型的state來表示同步狀態,該值表示鎖被一個線程重複獲取的次數。可是讀寫鎖ReentrantReadWriteLock內部維護着兩個一對鎖,須要用一個變量維護多種狀態。因此讀寫鎖採用「按位切割使用」的方式來維護這個變量,將其切分爲兩部分,高16爲表示讀,低16爲表示寫。分割以後,讀寫鎖是如何迅速肯定讀鎖和寫鎖的狀態呢?經過爲運算。假如當前同步狀態爲S,那麼寫狀態等於 S & 0x0000FFFF(將高16位所有抹去),讀狀態等於S >>> 16(無符號補0右移16位)。代碼以下:併發

        static final int SHARED_SHIFT   = 16;
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

        static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
        static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

 

 

寫鎖(寫鎖就是一個支持可重入的排他鎖)

寫鎖的獲取(寫鎖的獲取最終會調用tryAcquire(int arg),該方法在內部類Sync中實現:)

protected final boolean tryAcquire(int acquires) {
        Thread current = Thread.currentThread();
        //當前鎖個數
        int c = getState();
        //寫鎖
        int w = exclusiveCount(c);
        if (c != 0) {
            //c != 0 && w == 0 表示存在讀鎖
            //當前線程不是已經獲取寫鎖的線程
            if (w == 0 || current != getExclusiveOwnerThread())
                return false;
            //超出最大範圍
            if (w + exclusiveCount(acquires) > MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            setState(c + acquires);
            return true;
        }
        //是否須要阻塞
        if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
            return false;
        //設置獲取鎖的線程爲當前線程
        setExclusiveOwnerThread(current);
        return true;
    }

  該方法和ReentrantLock的tryAcquire(int arg)大體同樣,在判斷重入時增長了一項條件:讀鎖是否存在。由於要確保寫鎖的操做對讀鎖是可見的,若是在存在讀鎖的狀況下容許獲取寫鎖,那麼那些已經獲取讀鎖的其餘線程可能就沒法感知當前寫線程的操做。所以只有等讀鎖徹底釋放後,寫鎖纔可以被當前線程所獲取,一旦寫鎖獲取了,全部其餘讀、寫線程均會被阻塞。dom

寫鎖的釋放(獲取了寫鎖用完了則須要釋放,WriteLock提供了unlock()方法釋放寫鎖)

    public void unlock() {
        sync.release(1);
    }

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

  寫鎖的釋放最終仍是會調用AQS的模板方法release(int arg)方法,該方法首先調用tryRelease(int arg)方法嘗試釋放鎖,tryRelease(int arg)方法爲讀寫鎖內部類Sync中定義了,以下:性能

    protected final boolean tryRelease(int releases) {
        //釋放的線程不爲鎖的持有者
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        int nextc = getState() - releases;
        //若寫鎖的新線程數爲0,則將鎖的持有者設置爲null
        boolean free = exclusiveCount(nextc) == 0;
        if (free)
            setExclusiveOwnerThread(null);
        setState(nextc);
        return free;
    }

  寫鎖釋放鎖的整個過程和獨佔鎖ReentrantLock類似,每次釋放均是減小寫狀態,當寫狀態爲0時表示 寫鎖已經徹底釋放了,從而等待的其餘線程能夠繼續訪問讀寫鎖,獲取同步狀態,同時這次寫線程的修改對後續的線程可見。ui

讀鎖(讀鎖爲一個可重入的共享鎖,它可以被多個線程同時持有,在沒有其餘寫線程訪問時,讀鎖老是或獲取成功)

讀鎖的獲取(讀鎖的獲取能夠經過ReadLock的lock()方法)

        public void lock() {
            sync.acquireShared(1);
        }

  Sync的acquireShared(int arg)定義在AQS中:

    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

 

  tryAcqurireShared(int arg)嘗試獲取讀同步狀態,該方法主要用於獲取共享式同步狀態,獲取成功返回 >= 0的返回結果,不然返回 < 0 的返回結果。

    protected final int tryAcquireShared(int unused) {
        //當前線程
        Thread current = Thread.currentThread();
        int c = getState();
        //exclusiveCount(c)計算寫鎖
        //若是存在寫鎖,且鎖的持有者不是當前線程,直接返回-1
        //存在鎖降級問題,後續闡述
        if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
            return -1;
        //讀鎖
        int r = sharedCount(c);

        /*
         * readerShouldBlock():讀鎖是否須要等待(公平鎖原則)
         * r < MAX_COUNT:持有線程小於最大數(65535)
         * compareAndSetState(c, c + SHARED_UNIT):設置讀取鎖狀態
         */
        if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {
            /*
             * holdCount部分後面講解
             */
            if (r == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
            }
            return 1;
        }
        return fullTryAcquireShared(current);
    }

  讀鎖獲取的過程相對於獨佔鎖而言會稍微複雜下,整個過程以下:

  1. 由於存在鎖降級狀況,若是存在寫鎖且鎖的持有者不是當前線程則直接返回失敗,不然繼續
  2. 依據公平性原則,判斷讀鎖是否須要阻塞,讀鎖持有線程數小於最大值(65535),且設置鎖狀態成功,執行如下代碼(對於HoldCounter下面再闡述),並返回1。若是不知足改條件,執行fullTryAcquireShared()。
  final int fullTryAcquireShared(Thread current) {
        HoldCounter rh = null;
        for (;;) {
            int c = getState();
            //鎖降級
            if (exclusiveCount(c) != 0) {
                if (getExclusiveOwnerThread() != current)
                    return -1;
            }
            //讀鎖須要阻塞
            else if (readerShouldBlock()) {
                //列頭爲當前線程
                if (firstReader == current) {
                }
                //HoldCounter後面講解
                else {
                    if (rh == null) {
                        rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current)) {
                            rh = readHolds.get();
                            if (rh.count == 0)
                                readHolds.remove();
                        }
                    }
                    if (rh.count == 0)
                        return -1;
                }
            }
            //讀鎖超出最大範圍
            if (sharedCount(c) == MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            //CAS設置讀鎖成功
            if (compareAndSetState(c, c + SHARED_UNIT)) {
                //若是是第1次獲取「讀取鎖」,則更新firstReader和firstReaderHoldCount
                if (sharedCount(c) == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                }
                //若是想要獲取鎖的線程(current)是第1個獲取鎖(firstReader)的線程,則將firstReaderHoldCount+1
                else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    if (rh == null)
                        rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    //更新線程的獲取「讀取鎖」的共享計數
                    rh.count++;
                    cachedHoldCounter = rh; // cache for release
                }
                return 1;
            }
        }
    }

  fullTryAcquireShared(Thread current)會根據「是否須要阻塞等待」,「讀取鎖的共享計數是否超過限制」等等進行處理。若是不須要阻塞等待,而且鎖的共享計數沒有超過限制,則經過CAS嘗試獲取鎖,並返回1

讀鎖的釋放(與寫鎖相同,讀鎖也提供了unlock()釋放讀鎖)

        public void unlock() {
            sync.releaseShared(1);
        }

  unlcok()方法內部使用Sync的releaseShared(int arg)方法,該方法定義在AQS中:

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

  調用tryReleaseShared(int arg)嘗試釋放讀鎖,該方法定義在讀寫鎖的Sync內部類中:

   protected final boolean tryReleaseShared(int unused) {
        Thread current = Thread.currentThread();
        //若是想要釋放鎖的線程爲第一個獲取鎖的線程
        if (firstReader == current) {
            //僅獲取了一次,則須要將firstReader 設置null,不然 firstReaderHoldCount - 1
            if (firstReaderHoldCount == 1)
                firstReader = null;
            else
                firstReaderHoldCount--;
        }
        //獲取rh對象,並更新「當前線程獲取鎖的信息」
        else {
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                rh = readHolds.get();
            int count = rh.count;
            if (count <= 1) {
                readHolds.remove();
                if (count <= 0)
                    throw unmatchedUnlockException();
            }
            --rh.count;
        }
        //CAS更新同步狀態
        for (;;) {
            int c = getState();
            int nextc = c - SHARED_UNIT;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }

鎖降級

  鎖降級:從寫鎖變成讀鎖; 

  鎖升級:從讀鎖變成寫鎖。 

  讀鎖是能夠被多線程共享的,寫鎖是單線程獨佔的。也就是說寫鎖的併發限制比讀鎖高,這可能就是升級/降級名稱的來源。

  以下代碼會產生死鎖,由於同一個線程中,在沒有釋放讀鎖的狀況下,就去申請寫鎖,這屬於鎖升級,ReentrantReadWriteLock是不支持的。

 ReadWriteLock rtLock = new ReentrantReadWriteLock();
 rtLock.readLock().lock();
 System.out.println("get readLock.");
 rtLock.writeLock().lock();
 System.out.println("blocking");

 

  ReentrantReadWriteLock支持鎖降級,以下代碼不會產生死鎖。

ReadWriteLock rtLock = new ReentrantReadWriteLock();
rtLock.writeLock().lock();
System.out.println("writeLock");

rtLock.readLock().lock();
System.out.println("get read lock");

 

  以上這段代碼雖然不會致使死鎖,但沒有正確的釋放鎖。從寫鎖降級成讀鎖,並不會自動釋放當前線程獲取的寫鎖,仍然須要顯示的釋放,不然別的線程永遠也獲取不到寫鎖。鎖降級就意味着寫鎖是能夠降級爲讀鎖的,可是須要遵循先獲取寫鎖、獲取讀鎖在釋放寫鎖的次序。注意若是當前線程先獲取寫鎖,而後釋放寫鎖,再獲取讀鎖這個過程不能稱之爲鎖降級,鎖降級必定要遵循那個次序。

  鎖降級中讀鎖的獲取釋放爲必要?確定是必要的。試想,假如當前線程A不獲取讀鎖而是直接釋放了寫鎖,這個時候另一個線程B獲取了寫鎖,那麼這個線程B對數據的修改是不會對當前線程A可見的。若是獲取了讀鎖,則線程B在獲取寫鎖過程當中判斷若是有讀鎖尚未釋放則會被阻塞,只有當前線程A釋放讀鎖後,線程B纔會獲取寫鎖成功。

 

讀寫鎖例子

public class ReadWriteLockTest {
    public static void main(String[] args) {
        final Queue3 q3 = new Queue3();
        for(int i=0;i<3;i++)
        {
            new Thread(){
                public void run(){
                    while(true){
                        q3.get();                        
                    }
                }
            }.start();
        }
        for(int i=0;i<3;i++)
        {        
            new Thread(){
                public void run(){
                    while(true){
                        q3.put(new Random().nextInt(10000));
                    }
                }            
                
            }.start();    
        }
    }
}
class Queue3{
    private Object data = null;//共享數據,只能有一個線程能寫該數據,但能夠有多個線程同時讀該數據。
    private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
    public void get(){
            System.out.println(Thread.currentThread().getName() + " be to read data!");
            try {
                Thread.sleep((long)(Math.random()*1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "have read data :" + data);        
    }
    public void put(Object data){
        rwl.writeLock().lock();//上寫鎖,不容許其餘線程讀也不容許寫
        try {
            System.out.println(Thread.currentThread().getName() + " be to write data!");                    
            try {
                Thread.sleep((long)(Math.random()*1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            this.data = data;        
            System.out.println(Thread.currentThread().getName() + " have write data: " + data);                    
        } finally {
            rwl.writeLock().unlock();//釋放寫鎖    
        }
    }
}

 

Thread-0 be ready to read data!
Thread-1 be ready to read data!
Thread-2 be ready to read data!
Thread-0have read data :null
Thread-2have read data :null
Thread-1have read data :null
Thread-5 be ready to write data!
Thread-5 have write data: 6934
Thread-5 be ready to write data!
Thread-5 have write data: 8987
Thread-5 be ready to write data!
Thread-5 have write data: 8496

 

 

使用讀寫鎖模擬一個緩存器:

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class CacheDemo {
    /**
     * 緩存器,這裏假設須要存儲1000左右個緩存對象,按照默認的負載因子0.75,則容量=750,大概估計每個節點鏈表長度爲5個
     * 那麼數組長度大概爲:150,又有雨設置map大小通常爲2的指數,則最近的數字爲:128
     */
    private Map<String, Object> map = new HashMap<>(128);
    private ReadWriteLock rwl = new ReentrantReadWriteLock();
    public static void main(String[] args) {

    }
    public Object get(String id){
        Object value = null;
        rwl.readLock().lock();//首先開啓讀鎖,從緩存中去取
        try{
            value = map.get(id);
            if(value == null){  //若是緩存中沒有釋放讀鎖,上寫鎖
                rwl.readLock().unlock();
                rwl.writeLock().lock();
                try{
                    if(value == null){ //防止多寫線程重複查詢賦值
                        value = "redis-value";  //此時能夠去數據庫中查找,這裏簡單的模擬一下
                    }
                    rwl.readLock().lock(); //加讀鎖降級寫鎖,不明白的能夠查看上面鎖降級的原理與保持讀取數據原子性的講解
                }finally{
                    rwl.writeLock().unlock(); //釋放寫鎖
                }
            }
        }finally{
            rwl.readLock().unlock(); //最後釋放讀鎖
        }
        return value;
    }
}
相關文章
相關標籤/搜索