一個比讀寫鎖更快的鎖----StampedLock

簡介

ReentrantReadWriteLock支持讀寫鎖,StampedLock支持寫鎖、悲觀鎖讀和樂觀讀(無鎖)。其中寫鎖、悲觀讀鎖的語義和ReentrantReadWriteLock中的寫鎖、讀鎖語義同樣,都是容許多個線程同時獲取悲觀鎖讀,可是隻容許一個線程獲取寫鎖,寫鎖和悲觀讀鎖是互斥的。java

不一樣的是:StampedLock 裏的寫鎖和悲觀讀鎖加鎖成功以後,都會返回一個 stamp;而後解鎖的時候,須要傳入這個 stamp。node

如下爲官方使用例子緩存

public class Point {

  private final StampedLock sl = new StampedLock();
  private double x, y;

  void move(double deltaX, double deltaY) {
    long stamp = sl.writeLock();
    try {
      x += deltaX;
      y += deltaY;
    } finally {
      sl.unlockWrite(stamp);
    }
  }

  double distanceFromOrigin() {
    long stamp = sl.tryOptimisticRead();
    double currentX = x, currentY = y;
    if (!sl.validate(stamp)) {
        stamp = sl.readLock();
        try {
          currentX = x;
          currentY = y;
        } finally {
          sl.unlockRead(stamp);
        }
    }
    // 若是處理業務須要保持互斥,那麼就用互斥鎖,若是不須要保持互斥才能夠
    // 用讀寫鎖。通常來說緩存是不須要保持互斥性的,能接受瞬間的不一致
    return Math.sqrt(currentX * currentX + currentY * currentY);
  }

  // StampedLock 支持鎖的降級(經過 tryConvertToReadLock() 方法)和升級(經過tryConvertToWriteLock() 方法)
  void moveIfAtOrigin(double newX, double newY) {
    long stamp = sl.readLock();
    try {
      while (x == 0.0 && y == 0.0) {
        long ws = sl.tryConvertToWriteLock(stamp);
        if (ws != 0L) {
          stamp = ws;
          x = newX;
          y = newY;
          break;
        } else {
          sl.unlockRead(stamp);
          stamp = sl.writeLock();
        }
      }
    } finally {
      sl.unlock(stamp);
    }
  }
}

主要數據結構

雖然StampedLock沒有使用AQS,不過它的數據結構中仍是用到了CLH隊列。數據結構

WNode是CLH隊列的節點,其源碼以下併發

static final class WNode {
    // 前驅節點
    volatile WNode prev;

    // 後繼節點
    volatile WNode next;

    // 獲取讀鎖的列表
    volatile WNode cowait;

    // 線程
    volatile Thread thread;

    // 節點狀態。0, WAITING, or CANCELLED
    volatile int status;
    
    // 讀模式或者寫模式。RMODE or WMODE
    final int mode;  
    WNode(int m, WNode p) { mode = m; prev = p; }
  }

StampedLock中其中重要屬性以下ui

//CLH隊列頭結點
  private transient volatile WNode whead;

  // CLH隊列尾節點
  private transient volatile WNode wtail;

  // 鎖狀態
  private transient volatile long state;
  
  // 讀鎖次數的額外計數器
  private transient int readerOverflow;

  // 讀鎖的位數
  private static final int LG_READERS = 7;

  // 計算state值常量
  private static final long RUNIT = 1L;
  // 寫鎖標誌位,十進制:128 二進制:1000 0000
  private static final long WBIT  = 1L << LG_READERS;

  // 讀狀態標誌位。 十進制:127  二進制: 0111 1111
  private static final long RBITS = WBIT - 1L;

  // state狀態中記錄讀鎖快滿了的值,126
  private static final long RFULL = RBITS - 1L;

  // 用來獲取讀寫狀態。 十進制:255 二進制:1111 1111
  private static final long ABITS = RBITS | WBIT;

  // -128 (1....1 1000 0000)
  private static final long SBITS = ~RBITS;

  // 狀態state的初始值 256 二進制: 00001 0000 0000
  private static final long ORIGIN = WBIT << 1;


  // 鎖的狀態。使用state來控制當前是讀鎖,仍是寫鎖
  private transient volatile long state;
  
  // 額外的讀鎖計數器
  private transient int readerOverflow;

  // state初始值爲256
  public StampedLock() {
      state = ORIGIN;
  }

看過我以前分析AQS源碼的應該對這個CLH很熟悉了。一樣的StampedLock也是經過這個stae變量來進行讀寫鎖判斷的。這個state承載了三個部分的內容this

狀態位

StampedLock中state是long類型,佔64位,它被劃爲了三個部分來使用。低7位做爲讀鎖標誌位,能夠由多個線程共享,每有一個線程申請讀鎖成功,低7位就加1。第8位是寫鎖位,由線程獨佔。
其他位是stamp位,用來記錄寫鎖狀態的變化(版本號),每使用一次寫鎖,stamp位就會加1。spa

同時若是讀鎖數量超過了126以後,超出的次數使用readerOverflow來進行計數。線程

當出現併發的狀況的時候,CLH隊列的排隊狀況是怎樣的呢?code

好比,線程w1獲取了寫鎖,一直未釋放。此時有4個線程分別獲取讀鎖(獲取順序是R0-->R1-->R2-->R3),又有線程W2獲取寫鎖,最後還有R4,R5,R6三個線程獲取讀鎖,那麼此時隊列的排隊狀況以下

CLH

由於讀鎖是能夠被多個線程獲取的,若是同一時間有多個線程來獲取讀鎖卻獲取不到時,這個時候第一個獲取讀鎖的線程會被加入到鏈表中,然後面的的讀線程會被加入到cowait棧中,
能夠認爲cowait是一條副鏈。

這裏的cowait能夠理解爲協同等待,表示將這些獲取讀鎖的線程做爲一個總體來獲取鎖。

獲取樂觀讀鎖

public long tryOptimisticRead() {
    long s;
    return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
}

樂觀讀鎖邏輯比較簡單,就是判斷寫鎖是否被佔用(判斷state第8位的值是否爲1),若是寫鎖被佔用則返回0,不然返回stamp位。

state初始值爲256(1 0000 0000)

  01000 0000(WBIT)
& 10000 0000(state)
==============
  00000 0000

 1 0000 0000(state)
 1 1000 0000(SBITS)
 ==============
 1 0000 0000

檢測樂觀讀鎖

public boolean validate(long stamp) {

  // 經過UNSafe插入內存屏障,禁止重排序
  U.loadFence();
  return (stamp & SBITS) == (state & SBITS);
}

返回true: 表示期間沒有寫鎖,不關心讀鎖。

返回false: 表示期間有寫鎖發生

SBITS爲-128,用二進制表示是:1111 1111 1111 1000 0000

x xxxx xxxx(stamp)
1 1000 0000

1 yyyy yyyy(state)
1 1000 0000

SBITS後7位都是0,也就是不關心讀鎖,咱們只關心stamp位和寫位。

當咱們獲取樂觀讀時,若是此時已經有了寫鎖,那麼返回stamp值爲0,此時進行驗證確定爲false。

獲取普通讀鎖

public long readLock() {
  long s = state, next;
  // whead == wtail時,隊列爲空,表示當前沒有線程在排隊
  return ((whead == wtail && (s & ABITS) < RFULL &&
           U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ?
           // acquireRead的第一個參數爲false,標識不處理線程中斷
          next : acquireRead(false, 0L));
}

當獲取讀鎖成功時,會將state值加1。當條件不知足時(隊列不爲空,CAS失敗,或者讀鎖的個數已經大於等於126),都會進入到acquireRead()方法,這個方法主要分爲兩個大的for循環,代碼比較長我就不貼出來了。

它的主要邏輯以下:

  1. 若是當前隊列沒有排隊的線程,該線程是第一個準備排隊的元素,就有很大機會獲取到鎖,因此會先自旋嘗試獲取到鎖(自旋次數爲64),若是獲取到了將state值加1,若是讀鎖次數已經超出了126,則使用readerOverflow記錄超出的讀鎖數目。若是未獲取到鎖,則進入第2步
private long tryIncReaderOverflow(long s) {
 
  // 若是讀鎖的記錄數等於126,
  if ((s & ABITS) == RFULL) {
      // 將state的值加1
      // CAS以後state的後七位的值是127,state的整個值是383(1 0111 1111)。
      if (U.compareAndSwapLong(this, STATE, s, s | RBITS)) {
          // 將readOverflow的值加1
          ++readerOverflow;
          state = s;
          return s;
      }
  }
  else if ((LockSupport.nextSecondarySeed() &
            OVERFLOW_YIELD_RATE) == 0)
      Thread.yield();
  return 0L;
}
  1. 初始化入隊排隊節點,造成鏈表關係。和AQS同樣的是head節點被當成哨兵節點或者正持有鎖正在運行的線程(虛擬)。因此head節點的thread爲null,mode的值爲WMODE。這樣會造成

head --> node <-- tail這樣的節點關係

  1. 若是又有新的線程獲取讀鎖失敗,會將新的線程加入到cowait棧中
  2. 對於不是第一個獲取讀鎖的線程,它會先加入到cowait這個棧中,而後再經過CAS獲取鎖,若是獲取失敗就被阻塞,等待被喚醒或者因爲超時中斷被取消。
  3. 會對第一個獲取讀鎖的線程進行優待,由於它前面沒有在排隊的線程,因此這一層仍是在自旋獲取鎖,只是自旋次數再增長,首先會自旋1024次得到鎖,若是還未獲取到,在自旋2048次。

若是還未等到鎖釋放,就阻塞當前線程,等待被喚醒,直到得到鎖或者超時中斷被取消。

第1-4步屬於一個大的循環,第5步驟屬於另一個大的循環。

同時當獲取讀鎖線程被喚醒獲取到鎖後,它同時也會喚醒掛在它身上的cowait棧中的線程。

從分析中能夠看到StampedLock中經過大量的自旋操做能夠必定程度避免線程阻塞,只要線程執行操做夠快,釋放鎖比較及時,能夠說幾乎不會存在阻塞。

釋放讀鎖

釋放讀鎖的代碼比較簡單,主要操做以下

  1. 首先判斷stamp是否合法,若是不合法則拋出IllegalMonitorStateException異常
  2. 若是讀鎖個數小於126,則經過CAS將state值減去1,若是釋放的是最後一個讀鎖,則須要喚醒隊列中的節點。
  3. 若是讀鎖個數已經溢出了,則將readerOverflow減去1
public void unlockRead(long stamp) {
  long s, m; WNode h;
  // 自旋
  for (;;) {
      // 判斷stamp是否合法
      if (((s = state) & SBITS) != (stamp & SBITS) ||
          (stamp & ABITS) == 0L || (m = s & ABITS) == 0L || m == WBIT)
          throw new IllegalMonitorStateException();
      // 讀鎖個數小於126
      if (m < RFULL) {
          if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {
              // 釋放最後一個讀鎖時,須要喚醒下一個節點
              if (m == RUNIT && (h = whead) != null && h.status != 0)
                  release(h);
              break;
          }
      }
      // 讀鎖個數飽和溢出,須要減小readerOverflow
      else if (tryDecReaderOverflow(s) != 0L)
          break;
  }
}

獲取寫鎖

public long writeLock() {
  long s, next;
  return ((((s = state) & ABITS) == 0L &&
           U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
          next : acquireWrite(false, 0L));
}

若是state中的第8位等於0而且CAS設置state值成功,則獲取鎖成功,不然進入到acquireWrite方法。

acquireWrite的邏輯也分爲兩部分,分別是兩個for循環。

第一個for循環邏輯以下

for (int spins = -1;;) { // spin while enqueuing
    long m, s, ns;

    // 若是當前寫鎖標誌位仍然爲0,即沒有其餘線程獲取到寫鎖
    if ((m = (s = state) & ABITS) == 0L) {
        // CAS將state的值加128,實質是將寫鎖狀態位加1
        if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT))
            return ns;
    }
    // 其餘線程獲取到了寫鎖,則設定自旋次數爲64次
    else if (spins < 0)
        spins = (m == WBIT && wtail == whead) ? SPINS : 0;
    else if (spins > 0) {
        // 隨機減小自旋次數
        if (LockSupport.nextSecondarySeed() >= 0)
            --spins;
    }
    // 自旋仍未獲取到寫鎖則執行下面的邏輯
    // 初始化CLH隊列(主要是whead,wtail節點)
    // p指向了wtail節點
    else if ((p = wtail) == null) {
        WNode hd = new WNode(WMODE, null);
        if (U.compareAndSwapObject(this, WHEAD, null, hd))
            wtail = hd;
    }
    // 初始化節點
    else if (node == null)
        node = new WNode(WMODE, p);

    // 最終造成 whead --> node <-- wtail
    else if (node.prev != p)
        node.prev = p;
    else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
        p.next = node;
        // 造成鏈表關係以後自旋結束
        break;
    }
}

第二個for循環代碼也比較長,我就不貼出來了。它的核心邏輯以下

  1. 若是仍然只有當前線程在等待鎖,則會先自旋1024次去獲取寫鎖,若是獲取失敗,則在自旋2048次再次去獲取。若是都獲取失敗,則進入第二步。
  2. 阻塞當前線程等待被喚醒

釋放寫鎖

public void unlockWrite(long stamp) {
  WNode h;
  // 驗證stamp是否合法
  if (state != stamp || (stamp & WBIT) == 0L)
    throw new IllegalMonitorStateException();

  // 釋放寫鎖,stamp位(版本號)加1
  // stamp+WBIT會將state的第8位置爲0,就至關於釋放了寫鎖
  state = (stamp += WBIT) == 0L ? ORIGIN : stamp;

  // 若是頭結點不爲空,而且狀態不爲0,,則調用release方法喚醒下一個節點
  if ((h = whead) != null && h.status != 0)
      release(h);
}
private void release(WNode h) {
  if (h != null) {
    WNode q; Thread w;
    U.compareAndSwapInt(h, WSTATUS, WAITING, 0);
    // 若是頭結點的下一個節點爲空或者其狀態爲已取消
    if ((q = h.next) == null || q.status == CANCELLED) {
        // 從尾結點向前遍歷找到可用節點
        for (WNode t = wtail; t != null && t != h; t = t.prev)
            if (t.status <= 0)
                q = t;
    }
    // 喚醒q節點所在的線程
    if (q != null && (w = q.thread) != null)
        U.unpark(w);
  }
}

釋放寫鎖過程總結以下

  1. 將state的第8位置爲0(釋放寫鎖),並將stamp位(version)加1
  2. 喚醒下一個節點

鎖的降級(tryConvertToReadLock)

tryConvertToReadLock方法能夠用於鎖的降級。不過並非只有再獲取讀鎖時才能調用該方法。

public long tryConvertToReadLock(long stamp) {

  // a爲鎖標識,m則是最新的鎖標識
  long a = stamp & ABITS, m, s, next; WNode h;

  // state的寫鎖標誌位和版本號一致(有可能寫鎖標誌位是0,便可能是讀鎖)
  while (((s = state) & SBITS) == (stamp & SBITS)) {

      // 還未添加任何鎖標識
      if ((m = s & ABITS) == 0L) {
          if (a != 0L)
              break;
          // 讀鎖次數小於126
          else if (m < RFULL) {
              // 將state的值加1
              if (U.compareAndSwapLong(this, STATE, s, next = s + RUNIT))
                  return next;
          }
          // 讀鎖次數已經超出了,使用額外字段記錄
          else if ((next = tryIncReaderOverflow(s)) != 0L)
              return next;
      }
      // 當前是寫鎖狀態
      else if (m == WBIT) {
          if (a != m)
              break;
          // 將讀鎖次數加1,寫鎖標誌置爲0,stamp位加1。即加129
          state = next = s + (WBIT + RUNIT);
          // 釋放鎖
          if ((h = whead) != null && h.status != 0)
              release(h);
          return next;
      }
      // 已是讀鎖了直接返回
      else if (a != 0L && a < WBIT)
          return stamp;
      else
          break;
  }
  // 轉換失敗
  return 0L;
}

它的主要邏輯以下,若是當前線程還未加任何鎖,則加上寫鎖並返回最新的stamp值。若是當前線程已是寫鎖,則釋放寫鎖,並更新state的值(讀鎖加1,寫鎖狀態爲置爲0,version加1)。
若是當前線程是讀鎖則直接返回stamp值。若是上面條件都不知足,則轉換失敗。

鎖的升級(tryConvertToWriteLock)

public long tryConvertToWriteLock(long stamp) {
  long a = stamp & ABITS, m, s, next;

  // state的寫鎖標誌位和版本號一致(有可能寫鎖標誌位是0,便可能是讀鎖)
  while (((s = state) & SBITS) == (stamp & SBITS)) {

      // 還未添加任何鎖標識
      if ((m = s & ABITS) == 0L) {
          if (a != 0L)
              break;
          // 將寫鎖狀態位置爲0
          if (U.compareAndSwapLong(this, STATE, s, next = s + WBIT))
              return next;
      }
      // 若是當前已是寫鎖狀態,則直接返回
      else if (m == WBIT) {
          if (a != m)
              break;
          return stamp;
      }
      // 若是當前是惟一讀鎖,則轉換爲寫鎖
      else if (m == RUNIT && a != 0L) {
          if (U.compareAndSwapLong(this, STATE, s,
                  next = s - RUNIT + WBIT))
              return next;
      }
      else
          break;
  }
  return 0L;
}

鎖的升級和鎖的降級的邏輯相似,這裏就再也不讀過介紹了。

總結以及注意

  • StampedLock沒有使用AQS,而是依靠本身實現的同步狀態(long state佔64位)和變異的CLH隊列
  • StampedLock使用state的低7位標識讀鎖數量(超出126的使用readerOverflow字段記錄),第8位標識寫鎖,高56位記錄鎖的版本,每次釋放/獲取寫鎖版本號都會加1
  • StampedLock中讀鎖和讀鎖不阻塞,讀鎖寫鎖相互阻塞,寫鎖和寫鎖也相互阻塞
  • StampedLock的連續多個讀鎖線程,只有第一個是在CLH隊列中,後面的會掛在第一個線程的cowait棧中
  • StampedLock喚醒第一個讀線程後,讀線程會喚醒它cowait棧的全部讀線程(acquireRead()方法中)
  • StampedLock不支持公平鎖,也不支持Condition
  • StampedLock支持鎖的降級和鎖的升級
  • StampedLock中的樂觀讀操做是無鎖的
  • StampedLock中使用了大量的自旋+CAS操做,適合持有鎖時間比較短的任務,持有鎖時間長的話不只會浪費CPU並且仍然會阻塞本身。
  • 線程阻塞在 StampedLock 的 readLock() 或者 writeLock() 上時,此時調用該阻塞線程的 interrupt() 方法,會致使 CPU 飆升。因此,使用 StampedLock 必定不要調用中斷操做,若是須要支持中斷功能,必定使用可中斷的悲觀讀鎖 readLockInterruptibly() 和寫鎖 writeLockInterruptibly()
相關文章
相關標籤/搜索