Java併發編程筆記之讀寫鎖 ReentrantReadWriteLock 源碼分析

咱們知道在解決線程安全問題上使用 ReentrantLock 就能夠,可是 ReentrantLock 是獨佔鎖,同時只有一個線程能夠獲取該鎖,而實際狀況下會有寫少讀多的場景,顯然 ReentrantLock 知足不了需求,因此 ReentrantReadWriteLock 應運而生,ReentrantReadWriteLock 採用讀寫分離,多個線程能夠同時獲取讀鎖。java

 

首先咱們先看一下,ReentrantReadWriteLock 內部構造先看下它的類圖結構以下圖所示:安全

如上圖能夠看到讀寫鎖內部維護了一個ReadLock和WriteLock,而且也提供了公平和非公平的實現,下面只介紹下非公平的讀寫鎖的實現,咱們知道AQS裏面維護了一個state狀態,app

而ReentrantReadWriteLock 則須要維護讀狀態和寫狀態,一個state是沒法表示寫和讀狀態的。ReentrantReadWriteLock 巧妙的使用 state 的高 16 位表示讀狀態,性能

也就是獲取改讀鎖的線程個數,低 16 位 表示獲取到寫鎖的線程的可重入次數。並經過CAS對其進行操做實現了讀寫分離,在讀多寫少的場景下比較適用。ui

接下來用一張圖來加深對 ReentrantReadWriteLock 的理解:spa

 

 

首先咱們先看ReentrantReadWriteLock 的內部類Sync的一些關鍵屬性和方法,源碼以下:線程

static final int SHARED_SHIFT   = 16;

//共享鎖(讀鎖)狀態單位值65536 
static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
//共享鎖線程最大個數65535
static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;

//排它鎖(寫鎖)掩碼 二進制 15個1
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
//用來記錄最後一個獲取讀鎖的線程獲取讀鎖的可重入次數
private transient HoldCounter cachedHoldCounter;
//用來記錄第一個獲取到讀鎖的線程
private transient Thread firstReader;
//用來記錄第一個獲取到讀鎖的線程獲取讀鎖的可重入次數
private transient int firstReadHoldCount;
//用來存放除去第一個獲取讀鎖線程外的其餘線程獲取讀鎖的可重入次數
private transient ThreadLocalHoldCounter readHolds = new ThreadLocalHoldCounter();
/** 返回讀鎖線程數 */ static int sharedCount(int c) { return c >>> SHARED_SHIFT; } /** 返回寫鎖可重入個數 */ static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

類圖中 firstReader用來記錄第一個獲取到讀鎖的線程,firstReadHoldCount則記錄第一個獲取到讀鎖的線程獲取讀鎖的可重入數。cachedHoldCounter用來記錄最後一個獲取讀鎖的線程獲取讀鎖的可重入次數。code

接下咱們進入ReentrantReadWriteLock 的內部類Sync的內部類HoldCounter類的源碼,以下:對象

static final class HoldCounter {
       int count = 0;
       //線程id
       final long tid = getThreadId(Thread.currentThread());
   }

 

readHolds 是ThreadLocal 變量,用來存放除去第一個獲取讀鎖線程外的其餘線程獲取讀鎖的可重入次數,可知ThreadLocalHoldCounter繼承了ThreadLocal,裏面initialValue方法返回一個HoldCounter對象,源碼以下:blog

  static final class ThreadLocalHoldCounter
       extends ThreadLocal<HoldCounter> {
       public HoldCounter initialValue() {
           return new HoldCounter();
       }
   }

 

接下來進行寫鎖的獲取與釋放講解,以下:

ReentrantReadWriteLock 中寫鎖是使用的 WriteLock 來實現的。咱們先看一下寫鎖WriteLock的獲取與釋放方法,以下:

  1.void lock() 寫鎖是個獨佔鎖,同時只有一個線程能夠獲取該鎖。 若是當前沒有線程獲取到讀鎖和寫鎖則當前線程能夠獲取到寫鎖而後返回。 若是當前已經有線程取到讀鎖和寫鎖則當前線程則當前請求寫鎖線程會被阻塞掛起。

另外寫鎖是可重入鎖,若是當前線程已經獲取了該鎖,再次獲取的只是簡單的把可重入次數加一後直接返回。源碼以下:

  public void lock() {
       sync.acquire(1);
   }
   public final void acquire(int arg) {
        // sync重寫的tryAcquire方法
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

如上代碼,lock()內部調用了AQS的acquire方法,其中的tryAcquire是ReentrantReadWriteLock 內部 sync 類重寫的,代碼以下:

  protected final boolean tryAcquire(int acquires) {

            Thread current = Thread.currentThread();
            int c = getState();
            int w = exclusiveCount(c);
            //(1) c!=0說明讀鎖或者寫鎖已經被某線程獲取
            if (c != 0) {
                (2//w=0說明已經有線程獲取了讀鎖或者w!=0而且當前線程不是寫鎖擁有者,則返回false
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
               (3//說明某線程獲取了寫鎖,判斷可重入個數
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");

               (4// 設置可重入數量(1)
                setState(c + acquires);
                return true;
            }

           (5//第一個寫線程獲取寫鎖
            if (writerShouldBlock() || !compareAndSetState(c, c + acquires))
                return false;
            setExclusiveOwnerThread(current);
            return true;
   }

如上代碼(1),若是當AQS狀態值不爲0 則說明當前已經有線程獲取到了讀鎖或者寫鎖,代碼(2)若是w == 0 說明狀態值的低 16 爲0,而狀態值不爲0,則說明高16位不爲0,這暗示已經有線程獲取了讀鎖,因此直接返回false。

若是w != 0 說明當前已經有線程獲取了該寫鎖,則看當前線程是否是該鎖的持有者,若是不是則返回false。

執行到代碼(3)說明當前線程以前獲取到了該鎖,則判斷該線程的可重入此時是否是超過了最大值,是則拋異常,不然執行代碼(4)增長當前線程的可重入次數,而後返回true。

若是AQS的狀態值等於0,則說明目前沒有線程獲取到讀鎖和寫鎖,則實行代碼(5),

其中對於ReentrantReadWriteLock的子類NofairSync的writerShouldBlock方法的非公平鎖的實現源碼以下:

final boolean writerShouldBlock() {
       return false; // writers can always barge
   }

如代碼對於非公平鎖來講固定返回false,則說明代碼(5)搶佔式執行CAS嘗試獲取寫鎖,獲取成功則設置當前鎖的持有者爲當前線程返回true,不然返回false。

 

對於對於ReentrantReadWriteLock的子類FairSync的writerShouldBlock方法的公平鎖的實現源碼以下:

final boolean writerShouldBlock() {
  return hasQueuedPredecessors();
}

可知仍是使用 hasQueuedPredecessors 來判斷當前線程節點是否有前驅節點,若是有則當前線程放棄獲取寫鎖的權限直接返回 false。

 

  2.void lockInterruptibly() 相似 lock() 方法,不一樣在於該方法對中斷響應,也就是當其它線程調用了該線程的 interrupt() 方法中斷了當前線程,當前線程會拋出異常 InterruptedException,源碼以下:

    public void lockInterruptibly() throws InterruptedException {
            sync.acquireInterruptibly(1);
       }

 

  3.boolean tryLock() 嘗試獲取寫鎖,若是當前沒有其它線程持有寫鎖或者讀鎖,則當前線程獲取寫鎖會成功,而後返回 true。 若是當前已經其它線程持有寫鎖或者讀鎖則該方法直接返回 false,當前線程並不會被阻塞。

若是當前線程已經持有了該寫鎖則簡單增長 AQS 的狀態值後直接返回 true。源碼以下:

  public boolean tryLock( ) {
       return sync.tryWriteLock();
   }
  final boolean tryWriteLock() {
       Thread current = Thread.currentThread();
       int c = getState();
       if (c != 0) {
           int w = exclusiveCount(c);
           if (w == 0 || current != getExclusiveOwnerThread())
               return false;
           if (w == MAX_COUNT)
               throw new Error("Maximum lock count exceeded");
       }
       if (!compareAndSetState(c, c + 1))
           return false;
       setExclusiveOwnerThread(current);
       return true;
   }

如上代碼與tryAcquire 方法相似,這裏再也不講述,不一樣在於這裏使用的非公平策略

 

  4.boolean tryLock(long timeout, TimeUnit unit) 與 tryAcquire()不一樣在於多了超時時間的參數,若是嘗試獲取寫鎖失敗則會把當前線程掛起指定時間,待超時時間到後當前線程被激活,若是仍是沒有獲取到寫鎖則返回 false。

另外該方法對中斷響應, 也就是當其它線程調用了該線程的 interrupt() 方法中斷了當前線程,當前線程會拋出異常 InterruptedException。源碼以下:

public boolean tryLock(long timeout, TimeUnit unit)throws InterruptedException {
      return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

 

  5.void unlock() 嘗試釋放鎖,若是當前線程持有該鎖,調用該方法會讓該線程對該線程持有的 AQS 狀態值減一,若是減去 1 後當前狀態值爲 0 則當前線程會釋放對該鎖的持有,否者僅僅減一而已。

若是當前線程沒有持有該鎖調用了該方法則會拋出 IllegalMonitorStateException 異常 ,源碼以下:

public void unlock() {
    sync.release(1);
}
  public final boolean release(int arg) {
        //調用ReentrantReadWriteLock中sync實現的tryRelease方法
        if (tryRelease(arg)) {
            //激活阻塞隊列裏面的一個線程
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    protected final boolean tryRelease(int releases) {
           //(6) 看是不是寫鎖擁有者調用的unlock
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
           //(7)獲取可重入值,這裏沒有考慮高16位,由於寫鎖時候讀鎖狀態值確定爲0
            int nextc = getState() - releases;
            boolean free = exclusiveCount(nextc) == 0;
           //(8)若是寫鎖可重入值爲0則釋放鎖,否者只是簡單更新狀態值。
            if (free)
                setExclusiveOwnerThread(null);
            setState(nextc);
            return free;
      }

如上代碼 tryRelease 首先經過 isHeldExcusively判斷是否當前線程是該寫鎖的持有者,若是不是則拋異常,不然執行代碼(7)說明當前線程持有寫鎖,持有寫鎖說明狀態值的高16位爲0,因此這裏nextc值就是當前線程寫鎖的剩餘可重入次數。

代碼(8)判斷當前可重入次數是否爲0,若是free爲true 說明可重入次數爲0,則當前線程會釋放對寫鎖的持有,當前鎖的持有者設置爲null。若是free 爲false,則簡單更新可重入次數。

  

 

前面講解了寫鎖的獲取與釋放,接下來說解讀鎖的獲取與釋放,以下:

ReentrantReadWriteLock 中寫鎖是使用的 ReadLock 來實現的。主要看ReadLock讀鎖的獲取與釋放的主要方法,以下:

  1.void lock() 獲取讀鎖,若是當前沒有其它線程持有寫鎖,則當前線程能夠獲取讀鎖,AQS 的高 16 位的值會增長 1,而後方法返回。否者若是其它有一個線程持有寫鎖,則當前線程會被阻塞。源碼以下:

public void lock() {
   sync.acquireShared(1);
}
  public final void acquireShared(int arg) {
        //調用ReentrantReadWriteLock中的sync的tryAcquireShared方法
        if (tryAcquireShared(arg) < 0)
           //調用AQS的doAcquireShared方法
            doAcquireShared(arg);
    }

如上代碼讀鎖的lock方法調用了AQS的aquireShared方法,內部調用了 ReentrantReadWriteLock 中的 sync 重寫的 tryAcquireShared 方法,源碼以下:

protected final int tryAcquireShared(int unused) {

   //(1)獲取當前狀態值
    Thread current = Thread.currentThread();
    int c = getState();

    //(2)判斷是否寫鎖被佔用
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;

    //(3)獲取讀鎖計數
    int r = sharedCount(c);
    //(4)嘗試獲取鎖,多個讀線程只有一個會成功,不成功的進入下面fullTryAcquireShared進行重試
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        //(5)第一個線程獲取讀鎖
        if (r == 0) {
            firstReader = current;
            firstReaderHoldCount = 1;
        //(6)若是當前線程是第一個獲取讀鎖的線程
        } else if (firstReader == current) {
            firstReaderHoldCount++;
        } else {
            //(7)記錄最後一個獲取讀鎖的線程或記錄其它線程讀鎖的可重入數
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != current.getId())
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }
    //(8)相似tryAcquireShared,可是是自旋獲取
    return fullTryAcquireShared(current);
}

如上代碼,首先獲取了當前AQS的狀態值,而後代碼(2)看是否有其餘線程獲取到了寫鎖,若是是則直接返回了-1,而後調用AQS的doAcquireShared 方法把當前線程放入阻塞隊列。

不然執行到代碼(3)獲得獲取到讀鎖的線程個數,到這裏要說明目前沒有線程獲取到寫鎖,可是仍是有可能有線程持有讀鎖,而後執行代碼(4),非公平鎖的readerShouldBlock實現代碼以下:

final boolean readerShouldBlock() {
     return apparentlyFirstQueuedIsExclusive();
}
 final boolean apparentlyFirstQueuedIsExclusive() {
   Node h, s;
   return (h = head) != null && (s = h.next)  != null && !s.isShared() && s.thread != null;
  }

如上代碼做用是若是隊列裏面存在一個元素,則判斷第一個元素是否是正在嘗試獲取寫鎖,若是不是的話,則當前縣城使用判斷當前獲取讀鎖線程是否達到了最大值,最後執行CAS操做設置AQS狀態值的高 16 位值增長 1。

代碼(5)(6)記錄第一個獲取讀鎖的線程,並統計該線程獲取讀鎖的可重入次數,代碼(7)使用cachedHoldCounter 記錄最後一個獲取到讀鎖的線程,並同時該線程獲取讀鎖的可重入次數,另外readHolds記錄了當前線程獲取讀鎖的可重入次數。

若是readerShouldBlock 返回 true 則說明有線程正在獲取寫鎖,則執行代碼(8)fullTryAcquireShared 代碼與 tryAcquireShared 相似,不一樣在於前者是經過循環自旋獲取。

源碼以下:

    final int fullTryAcquireShared(Thread current) {
            HoldCounter rh = null;
            for (;;) {
                int c = getState();
                if (exclusiveCount(c) != 0) {
                    if (getExclusiveOwnerThread() != current)
                        return -1;
                    // else we hold the exclusive lock; blocking here
                    // would cause deadlock.
                } else if (readerShouldBlock()) {
                    // Make sure we're not acquiring read lock reentrantly
                    if (firstReader == current) {
                        // assert firstReaderHoldCount > 0;
                    } else {
                        if (rh == null) {
                            rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current)) {
                                rh = readHolds.get();
                                if (rh.count == 0)
                                    readHolds.remove();
                            }
                        }
                        if (rh.count == 0)
                            return -1;
                    }
                }
                if (sharedCount(c) == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                    if (sharedCount(c) == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        if (rh == null)
                            rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                        cachedHoldCounter = rh; // cache for release
                    }
                    return 1;
                }
            }
        }

 

  2.void lockInterruptibly() 相似 lock() 方法,不一樣在於該方法對中斷響應,也就是當其它線程調用了該線程的 interrupt() 方法中斷了當前線程,當前線程會拋出異常 InterruptedException。

 

  3.boolean tryLock() 嘗試獲取讀鎖,若是當前沒有其它線程持有寫鎖,則當前線程獲取寫鎖會成功,而後返回 true。若是當前已經其它線程持有寫鎖則該方法直接返回 false,當前線程並不會被阻塞。

若是其它獲取當前線程已經持有了該讀鎖則簡單增長 AQS 的狀態值高 16 位後直接返回 true。代碼相似 tryLock 這裏再也不講述。

 

  4.boolean tryLock(long timeout, TimeUnit unit) 與 tryLock()不一樣在於多了超時時間的參數,若是嘗試獲取讀鎖失敗則會把當前線程掛起指定時間,待超時時間到後當前線程被激活,若是仍是沒有獲取到讀鎖則返回 false。

另外該方法對中斷響應, 也就是當其它線程調用了該線程的 interrupt() 方法中斷了當前線程,當前線程會拋出異常 InterruptedException

 

  5.void unlock() 釋放鎖。源碼以下:

public void unlock() {
   sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    //若是當前線程是第一個獲取讀鎖線程
    if (firstReader == current) {
        //若是可重入次數爲1
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else//否者可重入次數減去1
            firstReaderHoldCount--;
    } else {
        //若是當前線程不是最後一個獲取讀鎖線程,則從threadlocal裏面獲取
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != current.getId())
            rh = readHolds.get();
        //若是可重入次數<=1則清除threadlocal
        int count = rh.count;
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        //可重入次數減去一
        --rh.count;
    }

    //循環直到本身的讀計數-1 cas更新成功
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))

            return nextc == 0;
    }
}

 

 

好了,到目前爲止,咱們知道了上一篇筆記中,使用ReentrantLock 實現的線程安全的 list, 可是因爲 ReentrantLock 是獨佔鎖因此在讀多寫少的狀況下性能不好,下面使用 ReentrantReadWriteLock 來改造爲以下代碼:

package com.hjc;

import java.util.ArrayList;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * Created by cong on 2018/6/14.
 */
public class ReentrantReadWriteLockTest {
    //線程不安全的list
    private ArrayList<String> array = new ArrayList<String>();
    //獨佔鎖
    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    private final Lock readLock = lock.readLock();
    private final Lock writeLock = lock.writeLock();

    //添加元素
    public void add(String e) {

        writeLock.lock();
        try {
            array.add(e);

        } finally {
            writeLock.unlock();

        }
    }
    //刪元素
    public void remove(String e) {

        writeLock.lock();
        try {
            array.remove(e);

        } finally {
            writeLock.unlock();

        }
    }

    //獲取數據
    public String get(int index) {

        readLock.lock();
        try {
            return array.get(index);

        } finally {
            readLock.unlock();

        }
    }
}

如代碼調用 get 方法適合使用的是讀鎖,這樣運行多個讀線程同時訪問 list 的元素,在讀多寫少的狀況下性能相比 ReentrantLock 會很好。

相關文章
相關標籤/搜索