Java 併發編程 --- 讀寫鎖:ReentrantReadWriteLock

簡述

重入鎖 ReentrantLock是排他鎖,排他鎖在同一時刻僅有一個線程能夠進行訪問,但在大多數場景下,大部分時間都是提供度服務,而寫服務佔有的時間較少。然而讀服務不存在數據競爭問題,若是一個線程在讀時禁止其餘線程勢必會致使性能下降。因此就提供了讀寫鎖。java

讀寫鎖維護者一對鎖,一個讀鎖和一個寫鎖。經過分離讀鎖和寫鎖,使得併發性比通常的排他鎖有了較大的提高:在同一時間能夠容許多個讀線程同時訪問,可是在寫線程訪問時,全部讀線程和寫線程都會被阻塞。併發

讀寫鎖的主要特性:性能

  • 公平性:支持公平性和非公平性。
  • 重入性:支持重入。讀寫鎖最多支持65535個遞歸寫入鎖和65535個遞歸讀取鎖。
  • 鎖降級:遵循獲取寫鎖、獲取讀鎖在釋放寫鎖的次序,寫鎖可以降級成爲讀鎖

讀寫鎖ReentrantReadWriteLock實現接口ReadWriteLock,該接口維護了一對相關的鎖,一個用於只讀操做,另外一個用於寫入操做。只要沒有 writer,讀取鎖能夠由多個 reader 線程同時保持。寫入鎖是獨佔的。ui

public interface ReadWriteLock {
    Lock readLock();
    Lock writeLock();
}

ReadWriteLock定義了兩個方法。readLock()返回用於讀操做的鎖,writeLock()返回用於寫操做的鎖。ReentrantReadWriteLock定義以下:this

/** 內部類  讀鎖 */
    private final ReentrantReadWriteLock.ReadLock readerLock;
    /** 內部類  寫鎖 */
    private final ReentrantReadWriteLock.WriteLock writerLock;

    final Sync sync;

    /** 使用默認(非公平)的排序屬性建立一個新的 ReentrantReadWriteLock */
    public ReentrantReadWriteLock() {
        this(false);
    }

    /** 使用給定的公平策略建立一個新的 ReentrantReadWriteLock */
    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }

    /** 返回用於寫入操做的鎖 */
    public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
    /** 返回用於讀取操做的鎖 */
    public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }

	abstract static class Sync extends AbstractQueuedSynchronizer {
        /**
         * 省略其他源代碼
         */
    }
    public static class WriteLock implements Lock, java.io.Serializable{
        /**
         * 省略其他源代碼
         */
    }

    public static class ReadLock implements Lock, java.io.Serializable {
        /**
         * 省略其他源代碼
         */
    }

ReentrantReadWriteLock與ReentrantLock同樣,其鎖主體依然是Sync,它的讀鎖、寫鎖都是依靠Sync來實現的。因此ReentrantReadWriteLock實際上只有一個鎖,只是在獲取讀取鎖和寫入鎖的方式上不同而已,它的讀寫鎖其實就是兩個類:ReadLock、writeLock,這兩個類都是lock實現。線程

讀寫狀態的設計

高16位表示讀,表示持有讀鎖的線程數(sharedCount)設計

低16位表示寫。寫鎖的重入次數 (exclusiveCount)code

& 運算:1 1 爲1 ,其他爲0.對象

在ReentrantLock中使用一個int類型的state來表示同步狀態,該值表示鎖被一個線程重複獲取的次數。可是讀寫鎖ReentrantReadWriteLock內部維護着兩個一對鎖,須要用一個變量維護多種狀態。因此讀寫鎖採用「按位切割使用」的方式來維護這個變量,將其切分爲兩部分,高16爲表示讀,低16爲表示寫。分割以後,讀寫鎖是如何迅速肯定讀鎖和寫鎖的狀態呢?經過爲運算。假如當前同步狀態爲S,那麼寫狀態等於 S & 0x0000FFFF(將高16位所有抹去),讀狀態等於S >>> 16(無符號補0右移16位)。代碼以下:排序

static final int SHARED_SHIFT   = 16;
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

        static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
        static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

寫鎖

寫鎖就是一個支持可重入的排他鎖。

寫鎖的獲取

寫鎖的獲取最終會調用tryAcquire(int arg),該方法在內部類Sync中實現:

protected final boolean tryAcquire(int acquires) {
        Thread current = Thread.currentThread();
        //當前鎖個數
        int c = getState();
        //寫鎖
        int w = exclusiveCount(c);
        if (c != 0) {
            //c != 0 && w == 0 表示存在讀鎖
            //當前線程不是已經獲取寫鎖的線程
            if (w == 0 || current != getExclusiveOwnerThread())
                return false;
            //超出最大範圍
            if (w + exclusiveCount(acquires) > MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            setState(c + acquires);
            return true;
        }
        //是否須要阻塞
        if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
            return false;
        //設置獲取鎖的線程爲當前線程
        setExclusiveOwnerThread(current);
        return true;
    }

寫鎖的獲取過程

  1. 當前線程獲取寫鎖,增長狀態便可。 2.當前線程獲取了讀鎖,或者其餘線程獲取鎖,等待。

該方法和ReentrantLock的tryAcquire(int arg)大體同樣,在判斷重入時增長了一項條件:讀鎖是否存在。由於要確保寫鎖的操做對讀鎖是可見的,若是在存在讀鎖的狀況下容許獲取寫鎖,那麼那些已經獲取讀鎖的其餘線程可能就沒法感知當前寫線程的操做。所以只有等讀鎖徹底釋放後,寫鎖纔可以被當前線程所獲取,一旦寫鎖獲取了,全部其餘讀、寫線程均會被阻塞。

寫鎖的釋放

獲取了寫鎖用完了則須要釋放,WriteLock提供了unlock()方法釋放寫鎖:

public void unlock() {
        sync.release(1);
    }

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

寫鎖的釋放最終仍是會調用AQS的模板方法release(int arg)方法,該方法首先調用tryRelease(int arg)方法嘗試釋放鎖,tryRelease(int arg)方法爲讀寫鎖內部類Sync中定義了,以下:

protected final boolean tryRelease(int releases) {
        //釋放的線程不爲鎖的持有者
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        int nextc = getState() - releases;
        //若寫鎖的新線程數爲0,則將鎖的持有者設置爲null
        boolean free = exclusiveCount(nextc) == 0;
        if (free)
            setExclusiveOwnerThread(null);
        setState(nextc);
        return free;
    }

寫鎖釋放鎖的整個過程和獨佔鎖ReentrantLock類似,每次釋放均是減小寫狀態,當寫狀態爲0時表示 寫鎖已經徹底釋放了,從而等待的其餘線程能夠繼續訪問讀寫鎖,獲取同步狀態,同時這次寫線程的修改對後續的線程可見。

讀鎖

讀鎖爲一個可重入的共享鎖,它可以被多個線程同時持有,在沒有其餘寫線程訪問時,讀鎖老是或獲取成功。

讀鎖的獲取

讀鎖的獲取能夠經過ReadLock的lock()方法:

public void lock() {
            sync.acquireShared(1);
        }

Sync的acquireShared(int arg)定義在AQS中:

public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

tryAcqurireShared(int arg)嘗試獲取讀同步狀態,該方法主要用於獲取共享式同步狀態,獲取成功返回 >= 0的返回結果,不然返回 < 0 的返回結果。

protected final int tryAcquireShared(int unused) {
        //當前線程
        Thread current = Thread.currentThread();
        int c = getState();
        //exclusiveCount(c)計算寫鎖
        //若是存在寫鎖,且鎖的持有者不是當前線程,直接返回-1
        //存在鎖降級問題,後續闡述
        if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
            return -1;
        //讀鎖
        int r = sharedCount(c);

        /*
         * readerShouldBlock():讀鎖是否須要等待(公平鎖原則)
         * r < MAX_COUNT:持有線程小於最大數(65535)
         * compareAndSetState(c, c + SHARED_UNIT):設置讀取鎖狀態
         */
        if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {
            /*
             * holdCount部分後面講解
             */
            if (r == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
            }
            return 1;
        }
        return fullTryAcquireShared(current);
    }

說明: readHolds 保存的是當前線程的重入次數。 firstReader :第一個獲取讀鎖的線程是單獨存放的,提升效率避免查找 readHolds。

讀鎖的獲取過程。 1.持有寫鎖的當前線程能夠獲取讀鎖,其他的都不能夠,返回-1. 2.嘗試獲取讀鎖,設置讀鎖的獲取次數。

讀鎖獲取的過程相對於獨佔鎖而言會稍微複雜下,整個過程以下:

  • 由於存在鎖降級狀況,若是存在寫鎖且鎖的持有者不是當前線程則直接返回失敗,不然繼續

  • 依據公平性原則,判斷讀鎖是否須要阻塞,讀鎖持有線程數小於最大值(65535),且設置鎖狀態成功,執行如下代碼(對於HoldCounter下面再闡述),並返回1。若是不知足改條件,執行fullTryAcquireShared()。

    fullTryAcquireShared: // 獲取讀鎖失敗,放到循環裏重試。

final int fullTryAcquireShared(Thread current) {
        HoldCounter rh = null;
        for (;;) {
            int c = getState();
            //鎖降級
            if (exclusiveCount(c) != 0) {
                if (getExclusiveOwnerThread() != current)
                    return -1;
            }
            //讀鎖須要阻塞
            else if (readerShouldBlock()) {
                //列頭爲當前線程
                if (firstReader == current) {
                }
                //HoldCounter後面講解
                else {
                    if (rh == null) {
                        rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current)) {
                            rh = readHolds.get();
                            if (rh.count == 0)
                                readHolds.remove();
                        }
                    }
                    if (rh.count == 0)
                        return -1;
                }
            }
            //讀鎖超出最大範圍
            if (sharedCount(c) == MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            //CAS設置讀鎖成功
            if (compareAndSetState(c, c + SHARED_UNIT)) {
                //若是是第1次獲取「讀取鎖」,則更新firstReader和firstReaderHoldCount
                if (sharedCount(c) == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                }
                //若是想要獲取鎖的線程(current)是第1個獲取鎖(firstReader)的線程,則將firstReaderHoldCount+1
                else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    if (rh == null)
                        rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    //更新線程的獲取「讀取鎖」的共享計數
                    rh.count++;
                    cachedHoldCounter = rh; // cache for release
                }
                return 1;
            }
        }
    }

循環裏重試的過程:

    1. 是否有鎖降級,不然返回 -1.
    1. 寫鎖空閒,且公平策略決定:線程應當阻塞。1.若是已經獲取了讀鎖,不處理。2.沒有獲取,則阻塞。
  • 3.寫鎖空閒,且公平策略決定:線程能夠獲取鎖。若是沒有達到最大值,則獲取能夠成功。繼續鎖成功的處理。

fullTryAcquireShared(Thread current)會根據「是否須要阻塞等待」,「讀取鎖的共享計數是否超過限制」等等進行處理。若是不須要阻塞等待,而且鎖的共享計數沒有超過限制,則經過CAS嘗試獲取鎖,並返回1

讀鎖的釋放

與寫鎖相同,讀鎖也提供了unlock()釋放讀鎖:

public void unlock() {
            sync.releaseShared(1);
        }

unlcok()方法內部使用Sync的releaseShared(int arg)方法,該方法定義在AQS中:

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

調用tryReleaseShared(int arg)嘗試釋放讀鎖,該方法定義在讀寫鎖的Sync內部類中:

protected final boolean tryReleaseShared(int unused) {
        Thread current = Thread.currentThread();
        //若是想要釋放鎖的線程爲第一個獲取鎖的線程
        if (firstReader == current) {
            //僅獲取了一次,則須要將firstReader 設置null,不然 firstReaderHoldCount - 1
            if (firstReaderHoldCount == 1)
                firstReader = null;
            else
                firstReaderHoldCount--;
        }
        //獲取rh對象,並更新「當前線程獲取鎖的信息」
        else {
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                rh = readHolds.get();
            int count = rh.count;
            if (count <= 1) {
                readHolds.remove();
                if (count <= 0)
                    throw unmatchedUnlockException();
            }
            --rh.count;
        }
        //CAS更新同步狀態
        for (;;) {
            int c = getState();
            int nextc = c - SHARED_UNIT;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }

HoldCounter

在讀鎖獲取鎖和釋放鎖的過程當中,咱們一直均可以看到一個變量rh (HoldCounter ),該變量在讀鎖中扮演着很是重要的做用。 咱們瞭解讀鎖的內在機制其實就是一個共享鎖,爲了更好理解HoldCounter ,咱們暫且認爲它不是一個鎖的機率,而至關於一個計數器。一次共享鎖的操做就至關於在該計數器的操做。獲取共享鎖,則該計數器 + 1,釋放共享鎖,該計數器 - 1。只有當線程獲取共享鎖後才能對共享鎖進行釋放、重入操做。因此HoldCounter的做用就是當前線程持有共享鎖的數量,這個數量必需要與線程綁定在一塊兒,不然操做其餘線程鎖就會拋出異常。咱們先看HoldCounter的定義:

static final class HoldCounter {
            int count = 0;
            final long tid = getThreadId(Thread.currentThread());
        }
相關文章
相關標籤/搜索