併發系列(4)之 AbstractQueuedSynchronizer 源碼分析

本文將主要講述 AbstractQueuedSynchronizer 的內部結構和實現邏輯,在看本文以前最好先了解一下 CLH 隊列鎖,AbstractQueuedSynchronizer 就是根據 CLH 隊列鎖的變種實現的,由於自己 AQS 比較複雜不容易看清楚他自己的實現邏輯,因此查看 CLH 隊列鎖的實現,能夠幫助咱們理清楚他內部的關係;關於隊列鎖的內容能夠參考 ,CLH、MCS 隊列鎖簡介html

1、AQS 結構概述

在 JDK 中除 synchronized 內置鎖外,其餘的鎖和同步組件,基本能夠分爲:java

  1. 面向用戶的邏輯部分(對於鎖而言就是 Lock interface);
  2. 面向底層的線程調度部分;

AbstractQueuedSynchronizer 即同步隊列則是 Doug Lea 大神爲咱們提供的底層線程調度的封裝;AQS 自己是根據 CLH 隊列鎖實現的,這一點在註釋中有詳細的介紹,CLH、MCS 隊列鎖簡介node

clh

簡單來說,CLH 隊列鎖就是一個單項鍊表,想要獲取鎖的線程封裝爲節點添加到尾部,而後阻塞檢查前任節點的狀態 (必定要注意是前任節點,由於這樣更容易實現取消、超時等功能,同時這也是選擇 CLH 隊列鎖的緣由),而頭結點則是當前已經得到鎖的線程,其主要做用是通知後繼節點(也就是說在沒有發生競爭的狀況下,是不須要頭結點的,這一點後面會詳細分析);編程


而對於 AQS 的結構大體能夠表述爲:併發

clh


public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
  protected AbstractQueuedSynchronizer() { }
  
  private transient volatile Node head;  // 懶加載,只有在發生競爭的時候纔會初始化;
  private transient volatile Node tail;  // 一樣懶加載;
  private volatile int state;  // 自定義的鎖狀態,能夠用來表示鎖的個數,以實現互斥鎖和共享鎖;
}

這裏的能夠直觀的看到鏈表結構的變化,其實next鏈表只是至關於遍歷的優化,而node節點的變化纔是主要的更新;ide

1. Node 結構

static final class Node {
  static final Node SHARED = new Node();  // 共享模式
  static final Node EXCLUSIVE = null;     // 互斥模式

  static final int CANCELLED =  1; // 表示線程取消獲取鎖
  static final int SIGNAL    = -1; // 表示後繼節點須要被喚醒
  static final int CONDITION = -2; // 表示線程位於條件隊列
  static final int PROPAGATE = -3; // 共享模式下節點的最終狀態,確保在doReleaseShared的時候將共享狀態繼續傳播下去

  /**
   * 節點狀態(初始爲0,使用CAS原則更新)
   * 互斥模式:0,SIGNAL,CANCELLED
   * 共享模式:0,SIGNAL,CANCELLED,PROPAGATE
   * 條件隊列:CONDITION
   */
  volatile int waitStatus;
  
  volatile Node prev;     // 前繼節點
  volatile Node next;     // 後繼節點
  volatile Thread thread; // 取鎖線程
  Node nextWaiter;        // 模式標識,取值:SHARED、EXCLUSIVE

  // Used by addWaiter,用於添加同隊隊列
  Node(Thread thread, Node mode) {   
    this.nextWaiter = mode;
    this.thread = thread;
  }

  // Used by Condition,同於添加條件隊列
  Node(Thread thread, int waitStatus) { 
    this.waitStatus = waitStatus;
    this.thread = thread;
  }
}

根據上面的代碼和註釋已經能夠看到 AQS 爲咱們提供了兩種模式,獨佔模式和共享模式(彼此獨立能夠同時使用);其中:oop

  • AbstractQueuedSynchronizer.state : 表示鎖的資源狀態,是咱們上面所說的面向用戶邏輯的部分;
  • Node.waitStatus : 表示節點在隊列中的狀態,是面向底層線程調度的部分;

這兩個變量必定要分清楚,在後面的代碼中也很容易弄混;源碼分析


2. AQS 運行邏輯

AQS 的運行邏輯能夠簡單表述爲:優化

AQS2

若是你熟悉 synchronized ,應該已經發現他們的運行邏輯實際上是差很少的,都用同步隊列和條件隊列,值得注意的是這裏的條件隊列和 Condition 一一對應,可能有多個;根據上圖能夠將 AQS 提供的功能總結爲:ui

  • 同步狀態的原子性管理;
  • 線程的阻塞與解除阻塞;
  • 隊列的管理;


3. 入隊

由於獨佔模式和共享模式彼此獨立能夠同時使用,因此在入隊的時候須要首先指定 Node 的類型,同時入隊的時候有競爭的可能,因此須要 CAS 入隊;

private Node addWaiter(Node mode) {
  Node node = new Node(Thread.currentThread(), mode); // SHARED、EXCLUSIVE
  // Try the fast path of enq; backup to full enq on failure
  Node pred = tail;
  if (pred != null) {
    node.prev = pred;
    if (compareAndSetTail(pred, node)) {
      pred.next = node;
      return node;
    }
  }
  enq(node);
  return node;
}

代碼中註釋也說明了,此處快速嘗試入隊,是一種優化手段,由於就通常狀況而言大多數時候是沒有競爭的;失敗後在循環入隊;

private Node enq(final Node node) {
  for (;;) {
    Node t = tail;
    if (t == null) { // Must initialize
      if (compareAndSetHead(new Node())) // 此時head和tail才初始化
        tail = head;
    } else {
      node.prev = t;
      if (compareAndSetTail(t, node)) {
        t.next = node;
        return t;
      }
    }
  }
}

而對於出隊則稍微複雜一點,獨佔模式下直接出隊,由於沒有競爭;共享模式下,則須要 CAS 設置頭結點,由於可能對有多個節點同時出隊,同時還須要向後傳播狀態,保證後面的線程能夠及時得到鎖;此外還可能發生中斷或者異常出隊,此時則須要考慮頭尾的狀況,保證不會影響隊列的結構;具體內容將會在源碼中一次講解;


2、獨佔模式

1. 應用

public class Mutex implements Lock {
  private final Sync sync = new Sync();
  private static final int lock = 1;
  private static final int unlock = 0;

  @Override
  public void lock() {
    sync.acquire(lock);
  }

  @Override
  public boolean tryLock() {
    return sync.tryAcquire(lock);
  }

  @Override
  public void unlock() {
    sync.release(unlock);
  }

  private static class Sync extends AbstractQueuedSynchronizer {
    @Override
    protected boolean isHeldExclusively() {
      return getState() == lock;
    }

    @Override
    public boolean tryAcquire(int acquires) {
      if (compareAndSetState(unlock, lock)) {
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
      }
      return false;
    }

    @Override
    protected boolean tryRelease(int releases) {
      if (getState() == unlock)
        throw new IllegalMonitorStateException();
      setExclusiveOwnerThread(null);
      setState(unlock);
      return true;
    }
  }
}

注意代碼中特地將 AbstractQueuedSynchronizer.state 取值定爲lock\unlock ,主要是便於理解 state 的含義,在互斥鎖中能夠任意取值,固然也能夠是負數,可是通常狀況下令其表示爲鎖的資源數量(也就是0、1)和共享模式對比,比較容易理解;

2. 獲取鎖

對於獨佔模式取鎖而言有一共有四中方式,

  • tryAcquire: 快速嘗試取鎖,成功時返回true;這是獨佔模式必需要重寫的方法,其餘方式獲取鎖時,也會先嚐試快速獲取鎖;同時 tryAcquire 也就決定了,這個鎖時公平鎖/非公平鎖,可重入鎖/不重衝入鎖等;(好比上面的實例就是不可重入非公平鎖,具體分析之後還會詳細講解)
  • acquire: 不響應中斷,阻塞獲取鎖;
  • acquireInterruptibly: 響應中斷,阻塞獲取鎖;
  • tryAcquireNanos: 響應中斷,超時阻塞獲取鎖;


acquire 方法

流程圖:

acquire

源碼分析:

public final void acquire(int arg) {
  if (!tryAcquire(arg) &&                             // 首先嚐試快速獲取鎖
       acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 失敗後入隊,而後阻塞獲取
    selfInterrupt();                                  // 最後若是取鎖的有中斷,則從新設置中斷
}
final boolean acquireQueued(final Node node, int arg) {
  boolean failed = true;
  try {
    boolean interrupted = false;           // 只要取鎖過程當中有一次中斷,返回時都要從新設置中斷
    for (;;) {
      final Node p = node.predecessor();   // 一直阻塞到前繼節點爲頭結點
      if (p == head && tryAcquire(arg)) {  // 獲取同步狀態
        setHead(node);                     // 設置頭結點,此時頭部不存在競爭,直接設置
        // next 主要起優化做用,而且在入隊的時候next不是CAS設置
        // 也就是經過next不必定能夠準確取到後繼節點,因此在喚醒的時候不能依賴next,須要反向遍歷
        p.next = null; // help GC          
        failed = false;
        return interrupted;
      }
      if (shouldParkAfterFailedAcquire(p, node) && // 判斷並整理前繼節點
        parkAndCheckInterrupt())                   // 當循環最多第二次的時候,必然阻塞
        interrupted = true;
    }
  } finally {
    if (failed)  // 異常時取消獲取
      cancelAcquire(node);
  }
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  int ws = pred.waitStatus;
  if (ws == Node.SIGNAL) return true;
  if (ws > 0) {  // 大於0說明,前繼節點異常或者取消獲取,直接跳過;
    do {
      node.prev = pred = pred.prev;  // 跳過pred並創建鏈接
    } while (pred.waitStatus > 0);
    pred.next = node;
  } else {
    compareAndSetWaitStatus(pred, ws, Node.SIGNAL);  // 標記後繼節點須要喚醒
  }
  return false;
}

其中 node.prev = pred = pred.prev; 相關的內存分析能夠查看 JAVA 連等賦值問題


acquireInterruptibly 方法

流程圖:

acquireInterruptibly

源碼分析:

public final void acquireInterruptibly(int arg) throws InterruptedException {
  if (Thread.interrupted()) throw new InterruptedException();  // 中斷退出
  if (!tryAcquire(arg))           // 獲取同步狀態
    doAcquireInterruptibly(arg);  // 中斷獲取
}
private void doAcquireInterruptibly(int arg) throws InterruptedException {
  final Node node = addWaiter(Node.EXCLUSIVE);   // 加入隊尾
  boolean failed = true;
  try {
    for (;;) {
      final Node p = node.predecessor();
      if (p == head && tryAcquire(arg)) {
        setHead(node);
        p.next = null; // help GC
        failed = false;
        return;
      }
      if (shouldParkAfterFailedAcquire(p, node) &&   // 判斷並整理前繼節點
        parkAndCheckInterrupt())                     // 等待
        throw new InterruptedException();
    }
  } finally {
    if (failed)
      cancelAcquire(node);
  }
}


tryAcquireNanos 方法

流程圖:

tryAcquireNanos

源碼分析:

public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
  if (Thread.interrupted()) throw new InterruptedException();
  return tryAcquire(arg) ||
    doAcquireNanos(arg, nanosTimeout);
}
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
  if (nanosTimeout <= 0L) return false;
  final long deadline = System.nanoTime() + nanosTimeout;
  final Node node = addWaiter(Node.EXCLUSIVE);
  boolean failed = true;
  try {
    for (;;) {
      final Node p = node.predecessor();
      if (p == head && tryAcquire(arg)) {
        setHead(node);
        p.next = null; // help GC
        failed = false;
        return true;
      }
      nanosTimeout = deadline - System.nanoTime();
      if (nanosTimeout <= 0L) return false;          // 超時退出
      if (shouldParkAfterFailedAcquire(p, node) &&
        nanosTimeout > spinForTimeoutThreshold)
        LockSupport.parkNanos(this, nanosTimeout);
      if (Thread.interrupted())
        throw new InterruptedException();
    }
  } finally {
    if (failed)
      cancelAcquire(node);
  }
}


3. 釋放鎖

釋放鎖時,判斷有後繼節點須要喚醒,則喚醒後繼節點,而後退出;有喚醒的後繼節點從新設置頭結點,並標記狀態

public final boolean release(int arg) {
  if (tryRelease(arg)) {   // 由用戶重寫,嘗試釋放
    Node h = head;
    if (h != null && h.waitStatus != 0)
      unparkSuccessor(h);  // 喚醒後繼節點
    return true;
  }
  return false;
}


3、共享模式

1. 應用

public class ShareLock implements Lock {
  private Syn sync;

  public ShareLock(int count) { this.sync = new Syn(count); }

  @Override
  public void lock() { sync.acquireShared(1); }

  @Override
  public void lockInterruptibly() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
  }

  @Override
  public boolean tryLock() { return sync.tryAcquireShared(1) >= 0; }

  @Override
  public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(time));
  }

  @Override
  public void unlock() { sync.releaseShared(1); }

  @Override
  public Condition newCondition() { throw new UnsupportedOperationException(); }

  private static final class Syn extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 5854536238831876527L;
    Syn(int count) {
      if (count <= 0) {
        throw new IllegalArgumentException("count must large than zero.");
      }
      setState(count);
    }

    @Override
    public int tryAcquireShared(int reduceCount) {
      for (; ; ) {
        int current = getState();
        int newCount = current - reduceCount;
        //若是新的狀態小於0 則返回值,則表示沒有鎖資源,直接返回
        if (newCount < 0 || compareAndSetState(current, newCount)) {
          return newCount;
        }
      }
    }

    @Override
    public boolean tryReleaseShared(int retrunCount) {
      for (; ; ) {
        int current = getState();
        int newCount = current + retrunCount;
        if (compareAndSetState(current, newCount)) {
          return true;
        }
      }
    }
  }
}

上述代碼中的 AbstractQueuedSynchronizer.state 表示鎖的資源數,可是仍然是不可重入的;


2. 獲取鎖

一樣對於共享模式取鎖也有四中方式:

  • tryAcquireShared: 快速嘗試取鎖,由用戶重寫
  • acquireShared: 不響應中斷,阻塞獲取鎖;
  • acquireSharedInterruptibly: 響應中斷,阻塞獲取鎖;
  • tryAcquireSharedNanos: 響應中斷,超時阻塞獲取鎖;

tryAcquireShared 方法

@Override
public int tryAcquireShared(int reduceCount) {
  for (; ; ) {
    int current = getState();
    int newCount = current - reduceCount;
    //若是新的狀態小於0 則返回值,則表示沒有鎖資源,直接返回
    if (newCount < 0 || compareAndSetState(current, newCount)) {
      return newCount;
    }
  }
}

須要注意的是 tryAcquireShared 方法是快速嘗試獲取鎖,並更新鎖狀態,若是失敗則必然鎖資源不足,返回負值;

acquireShared 方法

public final void acquireShared(int arg) {
  if (tryAcquireShared(arg) < 0)  // 快速獲取失敗
    doAcquireShared(arg);         // 阻塞獲取鎖
}
private void doAcquireShared(int arg) {
  final Node node = addWaiter(Node.SHARED);
  boolean failed = true;
  try {
    boolean interrupted = false;
    for (;;) {
      final Node p = node.predecessor();
      if (p == head) {
        int r = tryAcquireShared(arg);
        if (r >= 0) {
          setHeadAndPropagate(node, r);     // 設置頭結點,並是狀況將信號傳播下去
          p.next = null; // help GC
          if (interrupted) selfInterrupt(); // 從新設置中斷狀態
          failed = false;
          return;
        }
      }
      if (shouldParkAfterFailedAcquire(p, node) &&
        parkAndCheckInterrupt())
        interrupted = true;
    }
  } finally {
    if (failed)
      cancelAcquire(node);
  }
}
// propagate 表示線程獲取鎖後,共享鎖剩餘的鎖資源
private void setHeadAndPropagate(Node node, int propagate) {
  Node h = head; // Record old head for check below
  setHead(node);
  
  // propagate > 0 :表示還有剩餘的資源
  // h.waitStatus < 0 : 表示後繼節點須要被喚醒
  // 其他還作了不少保守判斷,確保後面的節點能及時那到鎖
  if (propagate > 0 || h == null || h.waitStatus < 0 ||
    (h = head) == null || h.waitStatus < 0) {
    Node s = node.next;
    if (s == null || s.isShared())
      doReleaseShared();  // 喚醒後繼節點
  }
}

根據上面的代碼能夠看到,共享模式和獨佔模式獲取鎖的主要區別:

  • 共享模式能夠有多個鎖
  • 設置頭結點的時候,同時還要將狀態傳播下去

其他的思路和獨佔模式差很少,他家能夠本身看源碼;

3. 釋放鎖

一樣 tryReleaseShared 是由用戶本身重寫的,這裏須要注意的是若是不能確保釋放成功(由於共享模式釋放鎖的時候可能有競爭,因此可能失敗),則在外層 Lock 接口使用的時候,就須要額外處理;

@Override
public boolean tryReleaseShared(int retrunCount) {
  for (; ; ) {
    int current = getState();
    int newCount = current + retrunCount;
    if (compareAndSetState(current, newCount)) {
      return true;
    }
  }
}

releaseShared 方法

public final boolean releaseShared(int arg) {
  if (tryReleaseShared(arg)) {  // 嘗試取鎖成功,此時鎖資源已從新設置
    doReleaseShared();          // 喚醒後繼節點
    return true;
  }
  return false;
}

doReleaseShared 方法必然執行兩次,

  • 第一次頭結點釋放鎖,而後喚醒後繼節點
  • 第二次後繼設置頭結點

最終使得頭結點的狀態必然是 PROPAGATE

private void doReleaseShared() {
  for (;;) {
    Node h = head;
    if (h != null && h != tail) {
      int ws = h.waitStatus;
      if (ws == Node.SIGNAL) {
        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
          continue;      // loop to recheck cases
        unparkSuccessor(h);
      }
      else if (ws == 0 &&
           !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
        continue;        // loop on failed CAS
    }
    if (h == head)       // loop if head changed
      break;
  }
}

4、條件隊列

1. ConditionObject 結構

condition

public class ConditionObject implements Condition, java.io.Serializable {
  private transient Node firstWaiter;
  private transient Node lastWaiter;
  ...
}

如代碼所示條件隊列是一個由 Node 組成的鏈表,注意這裏的鏈表不一樣於同步隊列,是經過 nextWaiter 鏈接的,在同步隊列中 nextWaiter 用來表示獨佔和共享模式,因此區分條件隊列的方法就有兩個:

  • Node.waitStatus = Node.CONDITION;
  • Node.next = null & Node.prev= null;

2. await

public final void await() throws InterruptedException {
  if (Thread.interrupted()) throw new InterruptedException();
  Node node = addConditionWaiter();     // 添加節點到條件隊列
  int savedState = fullyRelease(node);  // 確保釋放鎖,並喚醒後繼節點
  int interruptMode = 0;
  while (!isOnSyncQueue(node)) {        // node 不在同步隊列中
    LockSupport.park(this);             // 阻塞
    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
      break;
  }
  if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    interruptMode = REINTERRUPT;
  if (node.nextWaiter != null) // clean up if cancelled
    unlinkCancelledWaiters();
  if (interruptMode != 0)
    reportInterruptAfterWait(interruptMode);
}

3. signal

public final void signal() {
  if (!isHeldExclusively()) throw new IllegalMonitorStateException();
  Node first = firstWaiter;
  if (first != null)  
    doSignal(first);  // 從頭結點一次喚醒
}

private void doSignal(Node first) {
  do {
    if ( (firstWaiter = first.nextWaiter) == null)
      lastWaiter = null;
    first.nextWaiter = null;
  } while (!transferForSignal(first) &&  // 將節點移動到同步節點中
       (first = firstWaiter) != null);
}

由於篇幅有點長了,因此條件隊列講的也就相對簡單了一點,可是大致的思路仍是講了;

總結

  • AbstractQueuedSynchronizer 經過私有變量繼承方式使用
  • 觀察 AbstractQueuedSynchronizer ,其實和 synchronized 的結構基本相同,可是 synchronized 還會自動根據使用狀況進行鎖升級
  • 此外本文的主要參考資料是《java 併發編程的藝術》,有興趣的能夠自行查看;
相關文章
相關標籤/搜索