讀寫鎖--ReentrantReadWriteLock

讀寫鎖,對於讀操做來講是共享鎖,對於寫操做來講是排他鎖,兩種操做均可重入的一種鎖。底層也是用AQS來實現的,咱們來看一下它的結構跟代碼:
-----------------------------------------------------------------------------------------------
讀寫鎖,固然要區分讀跟寫兩種操做,所以其內部有ReadLock跟WriteLock兩種具體實現。但二者也有交互的地方,好比獲取寫鎖要判斷當前是否有線程在讀,有的話就須要等待,所以內部是使用的同一個隊列同步器。由於獲取鎖的時候,支持公平與非公平兩種方式,故而同步器的包裝類Sync也有兩個:FairSync跟NonfairSync。
從寫鎖的獲取開始:
protected final boolean tryAcquire(int acquires) {
    Thread current = Thread.currentThread();monitor
    int c = getState();  // 重入鎖的計數
    int w = exclusiveCount(c); // 計數高低位拆開爲讀計數跟寫計數,計算寫計數
    if (c != 0) { // 有人在佔有鎖
        if (w == 0 || current != getExclusiveOwnerThread())  // 寫計數爲0,只有讀鎖直接返回(避免了讀鎖升級爲寫鎖) 或者  當前線程不是執行線程(執行線程可能讀也可能寫)也返回
            return false;
        if (w + exclusiveCount(acquires) > MAX_COUNT)  //寫鎖重入次數 > 65525,拋出異常
            throw new Error("Maximum lock count exceeded");
        setState(c + acquires);  //重入的寫線程,直接設置狀態(第6行代碼沒有return,說明當前線程是重入的寫線程(寫計數不是0,且current就是獲取鎖的線程))
        return true;
    }
    if (writerShouldBlock() ||  !compareAndSetState(c, c + acquires))  //c!=0沒有return,說明當前鎖是空着的,因此cas搶佔
        return false;
    setExclusiveOwnerThread(current); // 當前線程參數設置
    return true;
}
  這裏有個writerShouldBlock(),看一下這個是幹啥的:
  追蹤源碼能夠發現,在FairSync跟NonfairSync分別都有這個方法,nonfair中直接return false,fair中是調用的hasQueuedPredecessors(),該方法是用來判斷是否有線程排隊的,這個writerShouldBlock()就是字面上意思(讀鎖是否該阻塞(排隊)),若是是非公平鎖,直接compareAndSetState進行搶佔,搶佔不到則進入排隊掛起,公平鎖,則判斷是否有排隊的,有則本身進入排隊,而不是先進行搶佔。公平鎖就像一個老實人,先排隊,沒人排隊(本身是第一個)則本身搶座位(爲啥要搶,由於可能這時候也來了一個認爲本身是第一個的老實人),非公平鎖就像土匪,管你排不排隊,老子先搶一把座位試試,搶不到(被別的線程搶走了),被別人打臉了再去排隊。綜上,第13行的邏輯就是:對於寫鎖,當前鎖沒有被佔用,若是是非公平方式獲取,直接搶佔,失敗則直接返回,公平方式則查看是否有排隊,有則獲取失敗直接返回,無則進行搶佔。方法總體上,沒獲取到鎖返回false,獲取到了返回true,而後結合AQS的代碼:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&  acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //沒有獲取到,就往等待隊列添加節點,而後掛起線程
        selfInterrupt();
}
  so,整個這個獲取寫鎖的過程就是:查看有沒有人佔用鎖,若是有讀鎖,則進入隊列等待;有寫鎖,則看是不是本身的線程,是則重設state而後繼續執行,不是則進入隊列等待。tryAcquire必定要跟acquire聯合起來看,不然難以理清整個流程。
  對了,還有個exclusiveCount(int c)用來拆分高低位的方法要說一下,由於讀寫鎖要分別記錄讀跟寫被重入的次數,按照通常設計,這分兩個變量來計數就好了,但jdk就是jdk,它把這兩個用一個int變量來記錄了。方法麼,就是高低位分開計算的,高16位表示讀狀態,低16位表示寫狀態。假設同步狀態爲s,則寫狀態爲s & 0x0000FFFF,至關於高16位所有清0,同理,讀狀態則爲s >>> 16,也就是右移,至關於把低16位都移除了。固然,複雜的就是讀狀態變化的時候,不是簡單的s +1,這樣的話就加到了寫操做上,而是s + 0x00010000,把低位補上0就好了。相似的與操做、位移等等,jdk中有大量的應用,好比hashmap中肯定元素所在鏈表等操做都有應用。
寫鎖釋放代碼:

protected final boolean tryRelease(int releases) {
    if (!isHeldExclusively())  //沒有寫鎖,拋異常
        throw new IllegalMonitorStateException();
    int nextc = getState() - releases;
    boolean free = exclusiveCount(nextc) == 0;  //次數是否清0了
    if (free)  //清0 了,說明徹底釋放了
        setExclusiveOwnerThread(null);
    setState(nextc);
    return free;
}
  釋放鎖比較簡單,不作贅述。
讀鎖獲取:

protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread(); 
    int c = getState();
    if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) // 有寫鎖 且當前線程不是寫鎖線程,不能重入,失敗
        return -1;
    int r = sharedCount(c);  //向右移位,獲取讀鎖計數
    if (!readerShouldBlock() &&  r < MAX_COUNT &&  compareAndSetState(c, c + SHARED_UNIT)) { // 若無需排隊  && 讀計數<65535(16位最大值) && 狀態設置成功(讀鎖的總體計數就是在這裏改的,注意加了一個默認值的操做)
        if (r == 0) { //讀鎖爲0
              firstReader = current;  //記錄第一個獲取鎖的線程
            firstReaderHoldCount = 1;
        } else if (firstReader == current) { //是本身重入的
            firstReaderHoldCount++;
        } else { // 
            HoldCounter rh = cachedHoldCounter;  //用於計算讀計數的計數器
            if (rh == null || rh.tid != getThreadId(current))  
                cachedHoldCounter = rh = readHolds.get();  
            else if (rh.count == 0) 
                readHolds.set(rh);
            rh.count++;   //當前線程的讀計數+1
        }
        return 1;
    }
    return fullTryAcquireShared(current); //第7行條件不知足,則for循環獲取讀鎖(實際不會死循環的)
}
  讀鎖的獲取比較複雜,這裏主要有一個多線程各自計數的問題。對於讀鎖,除了要對全局的state中的讀鎖的計數進行修改,還要每一個線程各自維護一份本身重入的次數計數,這個計數存在一個ThreadLocal(readHolds)中的一個對象(cachedHoldCounter)裏邊。讀鎖獲取的邏輯是:沒有寫鎖佔用,則直接獲取讀鎖,這就是第7行的邏輯(固然,可能跟其它讀線程衝突致使獲取失敗,則進入fullTryAcquireShared(current));若是有寫鎖佔用了呢,就調用fullTryAcquireShared(current)獲取鎖,看一下源碼:

final int fullTryAcquireShared(Thread current) {
    HoldCounter rh = null;
    for (;;) {
        int c = getState();
        if (exclusiveCount(c) != 0) { // 有寫鎖
            if (getExclusiveOwnerThread() != current) // 寫鎖持有者不是當前線程,獲取失敗,經過aqs的doAcquireShared()進入排隊
                return -1;                                                   //這裏只作了不是當前線程的判斷,若是是當前線程,這個地方不能進行排隊,由於若已有寫線程在排隊的話,就會形成死鎖,源碼中else一句的英文備註就是說這個
        } else if (readerShouldBlock()) { //沒寫鎖,但可能有寫鎖在等待讀鎖釋放!!須要排隊
                       // 寫鎖空閒  且  公平策略決定 線程應當被阻塞
            	       // 下面的處理是說,若是是已獲取讀鎖的線程重入讀鎖時, 即便公平策略指示應當阻塞也不會阻塞。
                       // 不然,這也會致使死鎖的。
            if (firstReader == current) { //
                // assert firstReaderHoldCount > 0;
            } else {
            // threadlocal 相關處理
                if (rh.count == 0)    // 須要阻塞且是非重入(還未獲取讀鎖的),獲取失敗
                    return -1;
            }
        }
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        if (compareAndSetState(c, c + SHARED_UNIT)) { // cas 搶鎖成功
   	//threadlocal相關處理
            return 1;
        }
    }
}
   讀鎖的釋放:
protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    if (firstReader == current) {// 清理firstReader緩存 或 readHolds裏的重入計數
        // assert firstReaderHoldCount > 0;
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) { //徹底釋放讀鎖
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;  // 主要用於重入退出
    }
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
// 釋放讀鎖對其餘讀線程沒有任何影響,
// 但能夠容許等待的寫線程繼續,若是讀鎖、寫鎖都空閒。
            return nextc == 0;
    }
}
-------------------------------------------------------------------------
  讀寫鎖的源碼,讀起來比想象中的難度大得多,緣由是讀鎖的部分設計比較複雜,主要涉及鎖降級,以及幾個變量跟threadlocal優化性能的處理。照着《java併發編程的藝術》跟一些博客看了2天,仍是沒把這部分徹底理清楚,這個艱苦的工做之後繼續搞吧。
  咱們看一下關於鎖的降級跟升級的問題,看是如何實現的:
  鎖降級的定義:鎖降級指的是寫鎖降級爲讀鎖。若是當前線程持有寫鎖,而後將其釋放,最後再獲取讀鎖,這種分段完成的過程不能稱之爲鎖降級。鎖降級指的是把持住(當前擁有的)寫鎖,再獲取到讀鎖,隨後釋放(先前擁有的)寫鎖的過程。----《java併發編程的藝術》
  這段話的理解是不難的,但要在讀寫鎖的源碼中去找到與之對應的邏輯是不太好找的。實際上:

if (exclusiveCount(c) != 0) {
    if (getExclusiveOwnerThread() != current)
        return -1;
  這個就是與之相關的邏輯代碼,在tryAcquireShared跟fullTryAcquireShared(Thread current) 中都有體現。
  上面的代碼的意思是:當寫鎖被持有時,若是持有該鎖的線程不是當前線程,就返回 「獲取鎖失敗」,反之就會繼續獲取讀鎖。稱之爲鎖降級。
  關於鎖降級,書中還給出了一個例子:

public void processCachedData() {
    readLock.lock();
    if(!update){
        //必須先釋放讀鎖
        readLock.unlock();
        //鎖降級從寫鎖獲取到開始
        writeLock.lock();
        try{
            if(!update){
                //準備數據的流程(略)
                update = true;
            }
            readLock.lock();
        }finally {
            writeLock.unlock();
        }
        //鎖降級完成,寫鎖降級爲讀鎖
    }
    try{
        //使用數據的流程
    }finally {
        readLock.unlock();
    }
}
  有一段文字說明:鎖降級中的讀鎖獲取是否必要呢?答案是必要的。主要是爲了保證數據的可見性,若是當前線程不獲取讀鎖而是直接釋放寫鎖,假設另外一個線程獲取了寫鎖並修改了數據,那麼當前線程沒法感知該線程的數據更新。
  但是,,,這裏有個疑問,另外一個線程獲取了寫鎖,你當前線程還能獲取讀鎖嗎?既然不能獲取,何來沒法感覺數據更新一說?這個地方感受有點問題。網上博文基本千篇一概也是說可見性如何的,跟書中觀點同樣,我以爲不對,比較贊同 https://www.jianshu.com/p/cd485e16456e這個所說的。
  至於ThreadLocal相關代碼,稍後再去理這裏邊的邏輯。
相關文章
相關標籤/搜索