簡單看看讀寫鎖ReentantReadWriteLock

  前面咱們看了可重入鎖ReentrantLock,其實這個鎖只適用於寫多讀少的狀況,就是多個線程去修改一個數據的時候,適合用這個鎖,可是若是多個線程都去讀一個數據,還用這個鎖的話會下降效率,由於同一時刻只能是一個線程去讀取!java

  本次咱們看看讀寫鎖ReentantReadWriteLock,這個鎖採用了讀寫分離的策略,分紅了讀鎖和寫鎖,多個線程能夠同時獲取讀鎖;安全

 

一.簡單使用讀寫鎖工具

  啥也別問,問就是先會用了再說,還記得前面用ReentrantLock實現了一個線程安全的List嗎?咱們可使用讀寫鎖稍微改造一下就行了;ui

package com.example.demo.study;

import java.util.ArrayList;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class Study0204 {
    // 線程不安全的List
    private ArrayList<String> list = new ArrayList<String>();
    // 讀寫鎖
    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    // 獲取讀鎖
    private final Lock readLock = lock.readLock();
    // 獲取寫鎖
    private final Lock writeLock = lock.writeLock();

    // 往集合中添加元素,寫鎖
    public void add(String str) {
        writeLock.lock();
        try {
            list.add(str);
        } finally {
            writeLock.unlock();
        }
    }

    // 刪除集合中的元素,寫鎖
    public void remove(String str) {
        writeLock.lock();
        try {
            list.remove(str);
        } finally {
            writeLock.unlock();
        }
    }

    // 根據索引獲取集合中某個元素,讀鎖
    public String get(int index) {
        readLock.lock();
        try {
            return list.get(index);
        } finally {
            readLock.unlock();
        }
    }

}

 

二.讀寫鎖的結構spa

  這裏最核心的仍是用了AQS,能夠看到裏面仍是有一個Sync這個內部工具類,而後還有兩個內部工具類,一個是讀鎖ReadLock,一個是寫鎖WriteLock線程

 

  咱們還能看到Sync這個類就是繼承AQS,而後有NonfairSync和FairSync這兩個類去繼承Sync,到這裏結構仍是和ReentrantLock是同樣的;code

 

  咱們再看看讀鎖和寫鎖,能夠看出來就是實現了Lock這個接口,而後經過傳進去的sync對象去實現Lock中的全部方法對象

 

  大概的結構就是這樣的,咱們可使用下面這個圖顯示出來,其實ReentrantReadWriteLock最重要的就是三個類:blog

    一個是Sync工具類用於操做AQS阻塞隊列和state的值,並且有基於Sync實現的公平策略和非公平策略;繼承

    一個是寫鎖,實現了Lock接口,內部有個Sync字段,在Lock的實現方法中就是調用Sync對象的方法去實現的

    另一個是讀鎖,和寫鎖同樣,實現了Lock接口,內部有個Sync字段,在Lock的實現方法也都是調用Sync對象的方法實現

 

 二.分析Sync 

  上篇博客中咱們知道了在ReentrantLock中的state表示的是鎖的可重入次數,並且state是AQS中定義的int類型,那麼在讀寫鎖這裏有兩個狀態是怎麼表示呢?

  總有一些人會想到一些花裏胡哨的東西,還別說,挺管用的,因爲state是int類型,共有32位,咱們能夠一分爲二,前面的16位叫作高16位,表示獲取讀鎖的次數,後面的叫作的低16位,表示寫鎖的可重入次數,具體的,咱們能夠看看Sync類的屬性,主要是涉及到基本的二進制運算,有興趣的能夠研究一下;

abstract static class Sync extends AbstractQueuedSynchronizer {

    //這個能夠說是讀鎖(共享鎖)移動的位數
    static final int SHARED_SHIFT   = 16;
    //讀鎖狀態單位值,這裏就是將1有符號左移16位,1用二進制表示爲:00000000 00000000 00000000 00000001
    //左移16位以後:00000000 00000001 00000000 00000000,也就是2的16次方,就是65536
    static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
    //讀鎖的線程最大數65535個
    static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;

    //寫鎖(排它鎖)掩碼,這裏用二進制表示 00000000 00000000 11111111 11111111
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

    //返回讀鎖的個數,這裏也就是將state無符號右移16位,那麼有效數字確定就是高16位,轉成十進制後就是獲取讀鎖的次數
    static int sharedCount(int c)    { 
        return c >>> SHARED_SHIFT; 
    }

    //返回寫鎖的個數,這裏就是將state和上面的寫鎖掩碼作按位與運算,高16位被置爲0,有效數字位第16位,轉成十進制就是寫鎖的可重入次數
    static int exclusiveCount(int c) { 
        return c & EXCLUSIVE_MASK;
    }

    //省略不少代碼
}

 

  其中,Sync中還有幾個比較重要的屬性以下,不懂沒關係,後面用到了再回頭看看就好;

 
 //記錄第一個獲取讀鎖的線程
 private transient Thread firstReader = null;

 //記錄第一個獲取讀鎖的線程繼續獲取讀鎖的可重入次數
 private transient int firstReaderHoldCount;

 //記錄最後一個獲取讀鎖的線程獲取讀鎖的可重入次數,HoldCounter類以下
 private transient HoldCounter cachedHoldCounter;

static final class HoldCounter {
    int count = 0;
    final long tid = getThreadId(Thread.currentThread());
}

//記錄除去第一個獲取讀鎖的線程外,獲取的讀鎖的可重入次數,ThreadLocalHoldCounter類以下
private transient ThreadLocalHoldCounter readHolds;

static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {
    public HoldCounter initialValue() {
        return new HoldCounter();
    }
}
  
        

 

三.寫鎖的獲取和釋放

  寫鎖在獲取的時候有一個前提:沒有其餘線程持有寫鎖或者讀鎖,當前線程才能獲取寫鎖,不然就把當前線程丟到阻塞隊列裏去了,記住,不能一邊讀還一邊寫

  1.lock方法:

  寫鎖主要是ReentantReadWriteLock中的內部類WriteLock類實現的,這是一個獨佔鎖,同一時刻只能有一個線程能夠獲取該鎖,時刻重入的;

public void lock() {
    sync.acquire(1);
}

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

protected final boolean tryAcquire(int acquires) {
    
    Thread current = Thread.currentThread();
    int c = getState();
    //這個方法的是實現static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; },就是將state的低十六位轉爲十進制,
    //也就是寫鎖的可重入次數
    int w = exclusiveCount(c);
    //state不爲0,說明讀鎖或者寫鎖被佔用了
    if (c != 0) {
        //若是w==0,而c!=0,說明c的高16爲不爲0,即有線程獲取了讀鎖,此時寫鎖是不能獲取的,注意,別人在讀的時候是不能寫入的呀!返回false
        //若是w!=0表示有線程獲取了寫鎖,可是佔用鎖的線程不是當前線程,那麼線程獲取寫鎖失敗,返回false
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        //到這裏說明寫鎖能夠獲取成功,那麼就要判斷寫鎖的可重入次數是否大於65535
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        //將state加一
        setState(c + acquires);
        return true;
    }
    //能到這裏來,說明c==0,也就是說讀鎖和寫鎖都在空閒着,下面咱們要看看公平策略下和非公平下的writerShouldBlock實現
    if (writerShouldBlock() || !compareAndSetState(c, c + acquires))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}

 

  咱們要看看最後的那裏的if語句,其中writerShouldBlock的實現,能到這裏說明讀鎖和寫鎖都在空閒這,能夠隨時去獲取;

  非公平策略下始終返回的是false,因而會走到compareAndSetState(c, c + acquires),這裏是用CAS嘗試獲取寫鎖,獲取失敗的話就返回發了;獲取成功的話就走到setExclusiveOwnerThread(current);設置佔用讀鎖的線程是當前線程;

 

  公平策略下的話,這個方法前面好像說過,就是判斷當前線程節點前面有沒有前驅節點,若是有的話那就確定獲取失敗啊,要讓前驅節點先獲取,因而在上面最後的if那裏直接返回false;若是這裏判斷沒有前驅節點,這裏返回true,那麼上面就會走到最後setExclusiveOwnerThread(current)設置當前線程佔有寫鎖

 

  2.tryLock方法

  這個方法和上面的lock方法同樣,注意,這裏最後那裏默認使用的是非公平模式;

public boolean tryLock( ) {
    return sync.tryWriteLock();
}

final boolean tryWriteLock() {
    Thread current = Thread.currentThread();
    int c = getState();
    if (c != 0) {
        int w = exclusiveCount(c);
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        if (w == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
    }
    //這裏默認使用的是非公平模式
    if (!compareAndSetState(c, c + 1))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}

 

 

  3.unlock方法

public void unlock() {
    sync.release(1);
}
//這個是AQS中的方法,說過tryRelease留給具體子類去實現的,重點看看怎麼實現的
public final boolean release(int arg) {
if (tryRelease(arg)) {
    Node h = head;
    if (h != null && h.waitStatus != 0)
        unparkSuccessor(h);
    return true;
}
return false;
}

protected final boolean tryRelease(int releases) {
    //isHeldExclusively方法在下面,由於是當前線程調用的release方法,要判斷當前線程是否是持有寫鎖的線程,不是的話就拋錯了
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    //state減一
    int nextc = getState() - releases;
    //獲取低十六位,看是否是等於0,若是等於0,說明此時沒有線程佔用寫鎖,因而就調用setExclusiveOwnerThread(null)
    //將佔用寫鎖的線程設置爲null,最後更新state就好了
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        setExclusiveOwnerThread(null);
    setState(nextc);
    return free;
}

protected final boolean isHeldExclusively() {
    return getExclusiveOwnerThread() == Thread.currentThread();
}

 

四.讀鎖的獲取和釋放

  結合前面的寫鎖一塊兒說一下:

    (1).沒有其餘線程持有寫鎖或者讀鎖,當前線程才能獲取寫鎖,不然就把當前線程丟到阻塞隊列裏去了;當前線程獲取了寫鎖以後,其餘線程不能獲取寫鎖和讀鎖;

    (2)沒有其餘線程獲取寫鎖時,當前線程才能獲取讀鎖,不然就丟到阻塞隊列裏去了,不能 一邊讀一邊寫;當前線程獲取了讀鎖以後,其餘線程只能獲取讀鎖,不能獲取寫鎖;

    (3)當前線程獲取的寫鎖以後,還能繼續獲取寫鎖,這叫作可重入;也能夠繼續獲取讀鎖,這叫作鎖降級;

    (4)當前線程獲取的讀鎖以後,還能繼續獲取讀鎖;

  1.lock方法

public void lock() {
    //acquireShared方法在AQS中
    sync.acquireShared(1);
}

public final void acquireShared(int arg) {
    //tryAcquireShared實如今ReentrantReadWriteLock中的Sync中
    if (tryAcquireShared(arg) < 0)
        //這個方法在AQS中,主要是將當前線程放到阻塞隊列中
        doAcquireShared(arg);
    }
protected final int tryAcquireShared(int unused) {
    
    Thread current = Thread.currentThread();
    int c = getState();
    //這裏就是判斷:若是其餘線程獲取了寫鎖,那麼就返回-1
    //先判斷寫鎖的可重入次數不爲0,表示有線程佔用寫鎖,並且還不是當前線程,那麼直接返回-1
    //這裏注意一下:一個線程在獲取寫鎖以後,還能夠再獲取讀鎖,釋放的時候兩個所都要釋放啊!!!
    if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
        return -1;
    //這個方法獲取讀鎖的次數,讀鎖的次數最多隻能是65535個
    int r = sharedCount(c);
    //readerShouldBlock方法分爲公平策略和非公平策略,這個方法的意思就是:當前線程獲取已經獲取讀鎖,再讀鎖被阻塞了,那麼說明還有其餘線程正在獲取寫鎖
    //若是返回false,說明此時沒有線程獲取寫鎖,並且這個方法分爲公平策略和非公平策略
    //公平策略的話,若是當前線程節點有前驅節點就返回true,沒有前驅節點返回false;
    //非公平策略的話,判斷阻塞隊列中哨兵節點後面的那個節點是否是正在獲取寫鎖,是的話返回true,不是的話返回false
    //compareAndSetState(c, c + SHARED_UNIT)方法中,SHARED_UNIT表示65536,這個CAS表示對高16爲增長1,對於整個32位來講,就是加2的16次方
    if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {
        //r==0表示讀鎖空閒,因而記錄第一個讀鎖,和第一個獲取讀鎖的線程獲取讀鎖的可重入次數
        if (r == 0) {
            firstReader = current;
            firstReaderHoldCount = 1;
        //若是當前線程就是第一個獲取讀鎖的線程,再獲取讀鎖,這裏就將可重入次數加一便可
        } else if (firstReader == current) {
            firstReaderHoldCount++;
        } else {
            //能到這裏就說明讀鎖已經被其餘線程佔用,當前線程是最後一個是最後獲取讀鎖的線程,咱們更新一下cacheHoldCounter和readHolds就好了
            //cacheHoldCounter表示最後一個獲取讀鎖的線程獲取讀鎖的可重入次數
            //readHolds記錄了除了第一個獲取讀鎖的線程外,其餘線程獲取讀鎖的可重入次數
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }
    //能到這裏說明readerShouldBlock方法返回的是true,並且當前線程在以前已經獲取了寫鎖,再獲取讀鎖,就是鎖降級!!!
    return fullTryAcquireShared(current);
}

//鎖降級操做
final int fullTryAcquireShared(Thread current) {
    HoldCounter rh = null;
    for (;;) {
        int c = getState();
        //寫鎖被其餘線程佔用,就返回-1
        if (exclusiveCount(c) != 0) {
            if (getExclusiveOwnerThread() != current)
                return -1;
        //獲取讀鎖被阻塞,此時還有其餘線程在獲取寫鎖,
        } else if (readerShouldBlock()) {
            if (firstReader == current) {
            } 
            else {
                //有其餘線程在嘗試獲取寫鎖,結束當前線程獲取讀鎖,就更新一下readHolds就好了
                //就從readHolds中移除當前線程的持有數,而後返回-1,結束嘗試獲取鎖步驟
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {
                        rh = readHolds.get();
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
                if (rh.count == 0)
                    return -1;
            }
        }
        //讀鎖數量達到了最大數量就拋錯
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        //CAS更新讀鎖的數量,而後更新一些變量
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            //讀鎖數量爲0,就讓當前線程做爲第一個獲取讀鎖的線程
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            //當前線程已經獲取過讀鎖了,就把第一個獲取讀鎖的可重入次數加一
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
            //這裏前面說過了 if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}

 

 

  2.tryLock方法

public boolean tryLock() {
    return sync.tryReadLock();
}

final boolean tryReadLock() {
    Thread current = Thread.currentThread();
    for (;;) {
        int c = getState();
        //若是當前寫鎖已經被佔用,獲取讀鎖失敗
        if (exclusiveCount(c) != 0 &&
            getExclusiveOwnerThread() != current)
            return false;
        int r = sharedCount(c);
        //讀鎖不能超過最大數量
        if (r == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        //更新state,將高16位加一,更新成功,若是讀鎖沒有線程佔有,就把當前線程更新爲第一個獲取讀鎖的線程和更新第一個獲取讀鎖的可重入次數
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (r == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            //當前線程就是第一個獲取讀鎖的線程,就將可重入次數加一
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
            //到這裏說明當前線程獲取讀鎖成功,雖然不是第一個獲取讀鎖的線程,因而更新一下cachedHoldCounter和readHolds
            //cachedHoldCounter:最後一個線程獲取讀鎖的可重入次數
            //readHolds:除去第一個線程,其餘線程獲取讀鎖的可重入次數
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
            }
            return true;
        }
    }
}

 

 

  3.unlock方法

public void unlock() {
    sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
    //實如今下面,就是嘗試釋放讀鎖,並判斷還有沒有線程佔用讀鎖,沒有線程佔用讀鎖,就會進入到if裏面doReleaseShared方法
    if (tryReleaseShared(arg)) {
        //前面可能有一些線程在獲取寫鎖的時候,因爲當前線程讀鎖沒有釋放,因此那些線程就被阻塞了
        //當前方法就是把那些線程釋放一個
        doReleaseShared();
        return true;
    }
    return false;
}
protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    //若是當前線程是第一個獲取讀鎖的線程
    if (firstReader == current) {
        //第一個線程獲取讀鎖的可重入次數爲1,就釋放,不然,可重入次數減一
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    //當前線程不是第一個獲取讀鎖的線程就更新cachedHoldCounter和readHolds    
    } else {
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;
    }
    //這裏一個無限循環,獲取state,將高十六位減一,用CAS更新,更新成功的話,就判斷讀鎖有沒有被佔用
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

 

  

五.總結

  咱們用下面這個圖來總結一下ReentrantReadWriteLock,這個鎖就是利用state是32位的,高16位用於表示讀鎖的個數,低16位表示寫鎖的可重入次數,經過CAS對其進行了讀寫分離,適用於讀多寫少的場景;

相關文章
相關標籤/搜索