JUC同步器框架AbstractQueuedSynchronizer源碼圖文分析

JUC同步器框架AbstractQueuedSynchronizer源碼圖文分析

前提

Doug Lea大神在編寫JUC(java.util.concurrent)包的時候引入了java.util.concurrent.locks.AbstractQueuedSynchronizer,Abstract Queued Synchronizer,也就是"基於隊列實現的抽象同步器",通常咱們稱之爲AQS。其實Doug Lea大神編寫AQS是有嚴謹的理論基礎的,他的我的博客上有一篇論文《The java.util.concurrent Synchronizer Framework》,文章在http://ifeve.com上能夠找到相關的譯文(《JUC同步器框架》),若是想要深刻研究AQS必需要理解一下該論文的內容,而後詳細分析一下AQS的源碼實現。本文在閱讀AQS源碼的時候選用的JDK版本是JDK11。java

AQS的主要功能

AQS是JUC包中用於構建鎖或者其餘同步組件(信號量、事件等)的基礎框架類。AQS從它的實現上看主要提供了下面的功能:node

  • 同步狀態的原子性管理。
  • 線程的阻塞和解除阻塞。
  • 提供阻塞線程的存儲隊列。

基於這三大功能,衍生出下面的附加功能:算法

  • 經過中斷實現的任務取消,基於線程中斷實現。
  • 可選的超時設置,也就是調用者能夠選擇放棄等待。
  • 定義了Condition接口,用於支持管程形式的await/signal/signalAll操做,代替了Object類基於JNI提供的wait/notify/notifyAll。

AQS還根據同步狀態的不一樣管理方式區分爲兩種不一樣的實現:獨佔狀態的同步器共享狀態的同步器安全

JUC同步器框架原理

The java.util.concurrent Synchronizer Framework》一文中其實有說起到同步器框架的僞代碼:數據結構

// acquire操做以下:
while (synchronization state does not allow acquire) {
    enqueue current thread if not already queued;
    possibly block current thread;
}
dequeue current thread if it was queued;

//release操做以下:
update synchronization state;
if (state may permit a blocked thread to acquire){
    unblock one or more queued threads;
}

翻譯一下:多線程

// acquire操做以下:
while(同步狀態申請獲取失敗){
    if(當前線程未進入等待隊列){
        當前線程放入等待隊列;
    }
    嘗試阻塞當前線程;
}
當前線程移出等待隊列

//release操做以下:
更新同步狀態
if(同步狀態足夠容許一個阻塞的線程申請獲取){
    解除一個或者多個等待隊列中的線程的阻塞狀態;
}

爲了實現上述操做,須要下面三個基本組件的相互協做:併發

  • 同步狀態的原子性管理。
  • 等待隊列的管理。
  • 線程的阻塞與解除阻塞。

其實基本原理很簡單,可是爲了應對複雜的併發場景和併發場景下程序執行的正確性,同步器框架在上面的acquire操做和release操做中使用了死循環和CAS等操做,不少時候會讓人感受邏輯過於複雜。框架

同步狀態管理

AQS內部內部定義了一個32位整型的state變量用於保存同步狀態:ide

/**
 * The synchronization state.
 */
private volatile int state;

// 獲取state
protected final int getState() {
    return state;
}

// 直接覆蓋設置state
protected final void setState(int newState) {
    state = newState;
}

// CAS設置state
protected final boolean compareAndSetState(int expect, int update) {
    return STATE.compareAndSet(this, expect, update);
}

同步狀態state在不一樣的實現中能夠有不一樣的做用或者表示意義,它能夠表明資源數、鎖狀態等等,遇到具體的場景咱們再分析它表示的意義。函數

CLH隊列變體

CLH鎖即Craig, Landin, and Hagersten (CLH) locks,由於它底層是基於隊列實現,通常也稱爲CLH隊列鎖。CLH鎖也是一種基於鏈表的可擴展、高性能、公平的自旋鎖,申請線程僅僅在本地變量上自旋,它不斷輪詢前驅的狀態,假設發現前驅釋放了鎖就結束自旋。從實現上看,CLH鎖是一種自旋鎖,能確保無飢餓性,提供先來先服務的公平性。先看簡單的CLH鎖的一個簡單實現:

public class CLHLock implements Lock {

    AtomicReference<QueueNode> tail = new AtomicReference<>(new QueueNode());

    ThreadLocal<QueueNode> pred;
    ThreadLocal<QueueNode> current;

    public CLHLock() {
        current = ThreadLocal.withInitial(QueueNode::new);
        pred = ThreadLocal.withInitial(() -> null);
    }

    @Override
    public void lock() {
        QueueNode node = current.get();
        node.locked = true;
        QueueNode pred = tail.getAndSet(node);
        this.pred.set(pred);
        while (pred.locked) {
        }
    }

    @Override
    public void unlock() {
        QueueNode node = current.get();
        node.locked = false;
        current.set(this.pred.get());
    }

    static class QueueNode {

        boolean locked;
    }

    // 忽略其餘接口方法的實現
}

上面是一個簡單的CLH隊列鎖的實現,內部類QueueNode只使用了一個簡單的布爾值locked屬性記錄了每一個線程的狀態,若是該屬性爲true,則相應的線程要麼已經獲取到鎖,要麼正在等待鎖,若是該屬性爲false,則相應的線程已經釋放了鎖。新來的想要獲取鎖的線程必須對tail屬性調用getAndSet()方法,使得自身成爲隊列的尾部,同時獲得一個指向前驅節點的引用pred,最後線程所在節點在其前驅節點的locked屬性上自旋,值得前驅節點釋放鎖。上面的實現是沒法運行的,由於一旦自旋就會進入死循環致使CPU飆升,能夠嘗試使用下面將要提到的LockSupport進行改造。

CLH隊列鎖本質是使用隊列(其實是單向鏈表)存放等待獲取鎖的線程,等待的線程老是在其所在節點的前驅節點的狀態上自旋,直到前驅節點釋放資源。從實際來看,過分自旋帶來的CPU性能損耗比較大,並非理想的線程等待隊列實現

基於原始的CLH隊列鎖中提供的等待隊列的基本原理,AQS實現一種了CLH鎖隊列的變體(variant)AQS類的protected修飾的構造函數裏面有一大段註釋用於說明AQS實現的等待隊列的細節事項,這裏列舉幾點重要的:

  • AQS實現的等待隊列沒有直接使用CLH鎖隊列,可是參考了其設計思路,等待節點會保存前驅節點中線程的信息,內部也會維護一個控制線程阻塞的狀態值。
  • 每一個節點都設計爲一個持有單獨的等待線程而且"帶有具體的通知方式"的監視器,這裏所謂通知方式就是自定義喚醒阻塞線程的方式而已。
  • 一個線程是等待隊列中的第一個等待節點的持有線程會嘗試獲取鎖,可是並不意味着它必定可以獲取鎖成功(這裏的意思是存在公平和非公平的實現),獲取失敗就要從新等待。
  • 等待隊列中的節點經過prev屬性鏈接前驅節點,經過next屬性鏈接後繼節點,簡單來講,就是雙向鏈表的設計。
  • CLH隊列本應該須要一個虛擬的頭節點,可是在AQS中沒有直接提供虛擬的頭節點,而是延遲到第一次競爭出現的時候懶建立虛擬的頭節點(其實也會建立尾節點,初始化時頭尾節點是同一個節點)。
  • Condition(條件)等待隊列中的阻塞線程使用的是相同的Node結構,可是提供了另外一個鏈表用來存放,Condition等待隊列的實現比非Condition等待隊列複雜。

線程阻塞與喚醒

線程的阻塞和喚醒在JDK1.5以前,通常只能依賴於Object類提供的wait()notify()notifyAll()方法,它們都是JNI方法,由JVM提供實現,而且它們必須運行在獲取監視器鎖的代碼塊內(synchronized代碼塊中),這個侷限性先不談性能上的問題,代碼的簡潔性和靈活性是比較低的。JDK1.5引入了LockSupport類,底層是基於Unsafe類的park()unpark()方法,提供了線程阻塞和喚醒的功能,它的機制有點像只有一個容許使用資源的信號量java.util.concurrent.Semaphore,也就是一個線程只能經過park()方法阻塞一次,只能調用unpark()方法解除調用阻塞一次,線程就會喚醒(屢次調用unpark()方法也只會喚醒一次),能夠想象是內部維護了一個0-1的計數器。

LockSupport類若是使用得好,能夠提供更靈活的編碼方式,這裏舉個簡單的使用例子:

public class LockSupportMain implements Runnable {

    private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");

    private Thread thread;

    private void setThread(Thread thread) {
        this.thread = thread;
    }

    public static void main(String[] args) throws Exception {
        LockSupportMain main = new LockSupportMain();
        Thread thread = new Thread(main, "LockSupportMain");
        main.setThread(thread);
        thread.start();
        Thread.sleep(2000);
        main.unpark();
        Thread.sleep(2000);
    }

    @Override
    public void run() {
        System.out.println(String.format("%s-步入run方法,線程名稱:%s", FORMATTER.format(LocalDateTime.now()),
                Thread.currentThread().getName()));
        LockSupport.park();
        System.out.println(String.format("%s-解除阻塞,線程繼續執行,線程名稱:%s", FORMATTER.format(LocalDateTime.now()),
                Thread.currentThread().getName()));
    }

    private void unpark() {
        LockSupport.unpark(thread);
    }
}
// 某個時刻的執行結果以下:
2019-02-25 00:39:57.780-步入run方法,線程名稱:LockSupportMain
2019-02-25 00:39:59.767-解除阻塞,線程繼續執行,線程名稱:LockSupportMain

LockSupportpark()方法也有帶超時的變體版本方法,有些適合使用阻塞超時的場景不妨可使用。

獨佔線程的保存

AbstractOwnableSynchronizerAQS的父類,一個同步器框架有可能在一個時刻被某一個線程獨佔,AbstractOwnableSynchronizer就是爲全部的同步器實現和鎖相關實現提供了基礎的保存、獲取和設置獨佔線程的功能,這個類的源碼很簡單:

public abstract class AbstractOwnableSynchronizer
    implements java.io.Serializable {

    private static final long serialVersionUID = 3737899427754241961L;

    protected AbstractOwnableSynchronizer() { }

    private transient Thread exclusiveOwnerThread;

    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }

    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}

它就提供了一個保存獨佔線程的變量對應的Setter和Getter方法,方法都是final修飾的,子類只能使用不能覆蓋。

CLH隊列變體的實現

這裏先重點分析一下AQS中等待隊列的節點AQS$Node的源碼:

static final class Node {
   // 標記一個節點處於共享模式下的等待
   static final Node SHARED = new Node();
   // 標記一個節點處於獨佔模式下的等待
   static final Node EXCLUSIVE = null;
   // 取消狀態
   static final int CANCELLED =  1;
   // 喚醒狀態
   static final int SIGNAL    = -1;
   // 條件等待狀態
   static final int CONDITION = -2;
   // 傳播狀態
   static final int PROPAGATE = -3;
   // 等待狀態,初始值爲0,其餘可選值是上面的4個值
   volatile int waitStatus;
   // 當前節點前驅節點的引用
   volatile Node prev;
   // 當前節點後繼節點的引用
   volatile Node next;
   // 當前節點持有的線程,多是阻塞中等待喚醒的線程
   volatile Thread thread;
   // 下一個等待節點
   Node nextWaiter;
   // 當前操做的節點是否處於共享模式
   final boolean isShared() {
      return nextWaiter == SHARED;
   }
   // 獲取當前節點的前驅節點,確保前驅節點必須存在,不然拋出NPE  
   final Node predecessor() {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }
    
    // 空節點,主要是首次建立隊列的時候建立的頭和尾節點使用
    Node() {}

    // 設置下一個等待節點,設置持有線程爲當前線程
    Node(Node nextWaiter) {
        this.nextWaiter = nextWaiter;
        THREAD.set(this, Thread.currentThread());
    }

    // 設置waitStatus,設置持有線程爲當前線程
    Node(int waitStatus) {
        WAITSTATUS.set(this, waitStatus);
        THREAD.set(this, Thread.currentThread());
    }

    // CAS更新waitStatus  
    final boolean compareAndSetWaitStatus(int expect, int update) {
        return WAITSTATUS.compareAndSet(this, expect, update);
    }
    // CAS設置後繼節點
    final boolean compareAndSetNext(Node expect, Node update) {
        return NEXT.compareAndSet(this, expect, update);
    }
    // 設置前驅節點
    final void setPrevRelaxed(Node p) {
        PREV.set(this, p);
    }

    // 下面是變量句柄的實現,在VarHandle出現以前使用的是Unsafe,其實底層仍是照樣使用Unsafe
    private static final VarHandle NEXT;
    private static final VarHandle PREV;
    private static final VarHandle THREAD;
    private static final VarHandle WAITSTATUS;
    static {
        try {
            MethodHandles.Lookup l = MethodHandles.lookup();
            NEXT = l.findVarHandle(Node.class, "next", Node.class);
            PREV = l.findVarHandle(Node.class, "prev", Node.class);
            THREAD = l.findVarHandle(Node.class, "thread", Thread.class);
            WAITSTATUS = l.findVarHandle(Node.class, "waitStatus", int.class);
        } catch (ReflectiveOperationException e) {
            throw new ExceptionInInitializerError(e);
        }
    }     
}

其中,變量句柄(VarHandle)是JDK9引用的新特性,其實底層依賴的仍是Unsafe的方法,整體和JDK8的實現是基本一致。這裏須要關注一下Node裏面的幾個屬性:

  • waitStatus:當前Node實例的等待狀態,可選值有5個。
    1. 初始值整數0:當前節點若是不指定初始化狀態值,默認值就是0,側面說明節點正在等待隊列中處於等待狀態。
    2. Node#CANCELLED整數值1:表示當前節點實例由於超時或者線程中斷而被取消,等待中的節點永遠不會處於此狀態,被取消的節點中的線程實例不會阻塞。
    3. Node#SIGNAL整數值-1:表示當前節點的後繼節點是(或即將是)阻塞的(經過park),當它釋放或取消時,當前節點必須unpark它的後繼節點。
    4. Node#CONDITION整數值-2:表示當前節點是條件隊列中的一個節點,當它轉換爲同步隊列中的節點的時候,狀態會被從新設置爲0。
    5. Node#PROPAGATE整數值-3:此狀態值一般只設置到調用了doReleaseShared()方法的頭節點,確保releaseShared()方法的調用能夠傳播到其餘的全部節點,簡單理解就是共享模式下節點釋放的傳遞標記。
  • prev、next:當前Node實例的前驅節點引用和後繼節點引用。
  • thread:當前Node實例持有的線程實例引用。
  • nextWaiter:這個值是一個比較容易使人生疑的值,雖然表面上它稱爲"下一個等待的節點",可是實際上它有三種取值的狀況。
    1. 值爲靜態實例Node.EXCLUSIVE(也就是null),表明當前的Node實例是獨佔模式。
    2. 值爲靜態實例Node.SHARED,表明當前的Node實例是共享模式。
    3. 值爲非Node.EXCLUSIVENode.SHARED的其餘節點實例,表明Condition等待隊列中當前節點的下一個等待節點

Node類的等待狀態waitStatus理解起來是十分費勁的,下面分析其餘源碼的時候會標識此狀態變化的時機。

其實上面的Node類能夠直接拷貝出來當成一個新建的類,而後嘗試構建一個雙向鏈表自行調試,這樣子就能深入它的數據結構。例如:

public class AqsNode {

    static final AqsNode SHARED = new AqsNode();
    static final AqsNode EXCLUSIVE = null;

    static final int CANCELLED = 1;
    static final int SIGNAL = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;

    volatile int waitStatus;

    volatile AqsNode prev;

    volatile AqsNode next;

    volatile Thread thread;

    AqsNode nextWaiter;

    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    final AqsNode predecessor() {
        AqsNode p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    AqsNode() {
    }

    AqsNode(AqsNode nextWaiter) {
        this.nextWaiter = nextWaiter;
        THREAD.set(this, Thread.currentThread());
    }

    AqsNode(int waitStatus) {
        WAITSTATUS.set(this, waitStatus);
        THREAD.set(this, Thread.currentThread());
    }

    final boolean compareAndSetWaitStatus(int expect, int update) {
        return WAITSTATUS.compareAndSet(this, expect, update);
    }

    final boolean compareAndSetNext(AqsNode expect, AqsNode update) {
        return NEXT.compareAndSet(this, expect, update);
    }

    final void setPrevRelaxed(AqsNode p) {
        PREV.set(this, p);
    }

    private static final VarHandle NEXT;
    private static final VarHandle PREV;
    private static final VarHandle THREAD;
    private static final VarHandle WAITSTATUS;

    static {
        try {
            MethodHandles.Lookup l = MethodHandles.lookup();
            NEXT = l.findVarHandle(AqsNode.class, "next", AqsNode.class);
            PREV = l.findVarHandle(AqsNode.class, "prev", AqsNode.class);
            THREAD = l.findVarHandle(AqsNode.class, "thread", Thread.class);
            WAITSTATUS = l.findVarHandle(AqsNode.class, "waitStatus", int.class);
        } catch (ReflectiveOperationException e) {
            throw new ExceptionInInitializerError(e);
        }
    }

    public static void main(String[] args) throws Exception {
        AqsNode head = new AqsNode();
        AqsNode next = new AqsNode(AqsNode.EXCLUSIVE);
        head.next = next;
        next.prev = head;
        AqsNode tail = new AqsNode(AqsNode.EXCLUSIVE);
        next.next = tail;
        tail.prev = next;
        List<Thread> threads = new ArrayList<>();
        for (AqsNode node = head; node != null; node = node.next) {
            threads.add(node.thread);
        }
        System.out.println(threads);
    }
}
// 某次執行的輸出:
[null, Thread[main,5,main], Thread[main,5,main]]

實際上,AQS中一共存在兩種等待隊列,其中一種是普通的同步等待隊列,這裏命名爲Sync-Queue,另外一種是基於Sync-Queue實現的條件等待隊列,這裏命名爲Condition-Queue。

Sync-Queue

前面已經介紹完AQS的同步等待隊列節點類,下面重點分析一下同步等待隊列的相關源碼,下文的Sync隊列、同步隊列和同步等待隊列是同一個東西。首先,咱們經過分析Node節點得知Sync隊列必定是雙向鏈表,AQS中有兩個瞬時成員變量用來存放頭節點和尾節點:

// 頭節點引用
private transient volatile Node head;
// 尾節點引用
private transient volatile Node tail;

// 變量句柄相關,用於CAS操做頭尾節點
private static final VarHandle STATE;
private static final VarHandle HEAD;
private static final VarHandle TAIL;

static {
    try {
        MethodHandles.Lookup l = MethodHandles.lookup();
        STATE = l.findVarHandle(AbstractQueuedSynchronizer.class, "state", int.class);
        HEAD = l.findVarHandle(AbstractQueuedSynchronizer.class, "head", Node.class);
        TAIL = l.findVarHandle(AbstractQueuedSynchronizer.class, "tail", Node.class);
    } catch (ReflectiveOperationException e) {
            throw new ExceptionInInitializerError(e);
    }
    // 確保LockSupport類已經初始化 - 這裏應該是爲了修復以前一個由於LockSupport未初始化致使的BUG
    Class<?> ensureLoaded = LockSupport.class;
}

// 初始化同步隊列,注意初始化同步隊列的時候,頭尾節點都是指向同一個新的Node實例
private final void initializeSyncQueue() {
    Node h;
    if (HEAD.compareAndSet(this, null, (h = new Node())))
        tail = h;
}

// CAS設置同步隊列的尾節點
private final boolean compareAndSetTail(Node expect, Node update) {
    return TAIL.compareAndSet(this, expect, update);
}

// 設置頭節點,重點注意這裏:傳入的節點設置成頭節點以後,前驅節點和持有的線程會置爲null,這是由於:
// 1.頭節點必定沒有前驅節點。
// 2.當節點被設置爲頭節點,它所在的線程必定是已經解除了阻塞。
private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

當前線程加入同步等待隊列和同步等待隊列的初始化是同一個方法,前文提到過:同步等待隊列的初始化會延遲到第一次可能出現競爭的狀況,這是爲了不無謂的資源浪費,具體方法是addWaiter(Node mode)

// 添加等待節點到同步等待隊列,實際上初始化隊列也是這個方法完成的
private Node addWaiter(Node mode) {
    // 基於當前線程建立一個新節點,節點的模式由調用者決定
    Node node = new Node(mode);
    for (;;) {
        Node oldTail = tail;
       // 尾節點不爲空說明隊列已經初始化過,則把新節點加入到鏈表中,做爲新的尾節點,創建和前驅節點的關聯關係
        if (oldTail != null) {
            node.setPrevRelaxed(oldTail);
            if (compareAndSetTail(oldTail, node)) {
                oldTail.next = node;
                return node;
            }
        } else {
        // 尾節點爲空說明隊列還沒有初始化過,進行一次初始化操做
            initializeSyncQueue();
        }
    }
}

在首次調用addWaiter()方法,死循環至少執行兩輪再跳出,由於同步隊列必須初始化完成後(第一輪循環),而後再把當前線程所在的新節點實例添加到等待隊列中再返回(第二輪循環)當前的節點,這裏須要注意的是新加入同步等待隊列的節點必定是添加到隊列的尾部而且會更新AQS中的tail屬性爲最新入隊的節點實例

假設咱們使用Node.EXCLUSIVE模式入隊列,手上有三個線程分別是thread-一、thread-2和thread-3,線程入隊的時候都處於阻塞狀態,模擬一下依次調用上面的入隊方法的同步隊列的整個鏈表的狀態。

先是線程thread-1加入等待隊列:

j-a-q-s-1

接着是線程thread-2加入等待隊列:

j-a-q-s-2

最後是線程thread-3加入等待隊列:

j-a-q-s-3

若是仔細研究會發現,若是全部的入隊線程都處於阻塞狀態的話,新入隊的線程老是添加到隊列的tail節點,阻塞的線程老是"爭搶"着成爲head節點,這一點和CLH隊列鎖的阻塞線程老是基於前驅節點自旋以獲取鎖的思路是一致的。下面將會分析的獨佔模式與共享模式,線程加入等待隊列都是經過addWaiter()方法

Condition-Queue

前面已經相對詳細地介紹過同步等待隊列,在AQS中還存在另一種相對特殊和複雜的等待隊列-條件等待隊列。介紹條件等待隊列以前,要先介紹java.util.concurrent.locks.Condition接口。

public interface Condition {
    
    // 當前線程進入等待狀態直到被喚醒或者中斷
    void await() throws InterruptedException;
    // 當前線程進入等待狀態,不響應中斷,阻塞直到被喚醒
    void awaitUninterruptibly();
    // 當前線程進入等待狀態直到被喚醒或者中斷,阻塞帶時間限制
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    // 當前線程進入等待狀態直到被喚醒或者中斷,阻塞帶時間限制
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    // 當前線程進入等待狀態直到被喚醒或者中斷,阻塞帶時間限制
    boolean awaitUntil(Date deadline) throws InterruptedException;
    // 喚醒單個阻塞線程
    void signal();
    // 喚醒全部阻塞線程
    void signalAll();
}

Condition能夠理解爲Object中的wait()notify()notifyAll()的替代品,由於Object中的相應方法是JNI(Native)方法,由JVM實現,對使用者而言並非十分友好(可能須要感知JVM的源碼實現),而Condition是基於數據結構和相應算法實現對應的功能,咱們能夠從源碼上分析其實現。

Condition的實現類是AQS的公有內部類ConditionObjectConditionObject提供的入隊列方法以下:

public class ConditionObject implements Condition, java.io.Serializable {
    private static final long serialVersionUID = 1173984872572414699L;
    /** First node of condition queue. */ - 條件隊列的第一個節點
    private transient Node firstWaiter;
    /** Last node of condition queue. */ - 條件隊列的最後一個節點
    private transient Node lastWaiter;
    // 公有構造函數
    public ConditionObject() { }
    // 添加條件等待節點
    private Node addConditionWaiter() {
        // 這裏作一次判斷,當前線程必須步入此同步器實例
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        // 臨時節點t賦值爲lastWaiter引用
        Node t = lastWaiter;
        // If lastWaiter is cancelled, clean out.
        // 最後一個節點不爲條件等待狀態,則是取消狀態
        if (t != null && t.waitStatus != Node.CONDITION) {
            // 解除全部取消等待的節點的鏈接
            unlinkCancelledWaiters();
            t = lastWaiter;
        }
        // 基於當前線程新創建一個條件等待類型的節點
        Node node = new Node(Node.CONDITION);
        // 首次建立Condition的時候,最後一個節點臨時引用t爲null,則把第一個節點置爲新建的節點
        if (t == null)
            firstWaiter = node;
        else
            // 已經存在第一個節點,則經過nextWaiter鏈接新的節點
            t.nextWaiter = node;
        // 最後一個節點的引用更新爲新節點的引用    
        lastWaiter = node;
        return node;
    } 
    // 從條件等待隊列解除全部取消等待的節點的鏈接,其實就是全部取消節點移除的操做,涉及到雙向鏈表的斷鏈操做、第一個和最後一個節點的引用更新
    private void unlinkCancelledWaiters() {
        Node t = firstWaiter;
        Node trail = null;
        while (t != null) {
            Node next = t.nextWaiter;
            // 注意這裏等待狀態的判斷
            if (t.waitStatus != Node.CONDITION) {
                t.nextWaiter = null;
                if (trail == null)
                    firstWaiter = next;
                else
                    trail.nextWaiter = next;
                if (next == null)
                    lastWaiter = trail;
            }
            else
                trail = t;
            t = next;
        }
    } 
    // 當前同步器實例持有的線程是否當前線程(currentThread())
    protected boolean isHeldExclusively() {
        throw new UnsupportedOperationException();
    } 

// 暫時不分析其餘方法             
}

實際上,Condition的全部await()方法變體都調用addConditionWaiter()添加阻塞線程到條件隊列中。咱們按照分析同步等待隊列的狀況,分析一下條件等待隊列。正常狀況下,假設有2個線程thread-1和thread-2進入條件等待隊列,都處於阻塞狀態。

先是thread-1進入條件隊列:

j-a-q-s-4

而後是thread-2進入條件隊列:

j-a-q-s-5

條件等待隊列看起來也並不複雜,可是它並非單獨存在和使用的,通常依賴於同步等待隊列,下面的一節分析Condition的實現的時候再詳細分析。

獨佔模式與共享模式

前文說起到,同步器涉及到獨佔模型和共享模式。下面就針對這兩種模式詳細分析一下AQS的具體實現源碼。

獨佔模式

AQS同步器若是使用獨佔(EXCLUSIVE)模式,那麼意味着同一個時刻,只有節點所在一個線程獲取(acuqire)原子狀態status成功,此時該線程能夠從阻塞狀態解除繼續運行,而同步等待隊列中的其餘節點持有的線程依然處於阻塞狀態。獨佔模式同步器的功能主要由下面的四個方法提供:

  • acquire(int arg);申請獲取arg個原子狀態status(申請成功能夠簡單理解爲status = status - arg)。
  • acquireInterruptibly(int arg):申請獲取arg個原子狀態status,響應線程中斷。
  • tryAcquireNanos(int arg, long nanosTimeout):申請獲取arg個原子狀態status,帶超時的版本。
  • release(int arg):釋放arg個原子狀態status(釋放成功能夠簡單理解爲status = status + arg)。

獨佔模式下,AQS同步器實例初始化時候傳入的status值,能夠簡單理解爲"容許申請的資源數量的上限值",下面的acquire類型的方法暫時稱爲"獲取資源",而release方法暫時稱爲"釋放資源"。接着咱們分析前面提到的四個方法的源碼,先看acquire(int arg)

public final void acquire(int arg) {
    // 獲取資源成功或者新增一個獨佔類型節點到同步等待隊列成功則直接返回,不然中斷當前線程
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

// 此方法必須又子類覆蓋,用於決定是否獲取資源成功
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

// 中斷當前線程
static void selfInterrupt() {
    Thread.currentThread().interrupt();
}

// 不可中斷的獨佔模式下,同步等待隊列中的線程獲取資源的方法
final boolean acquireQueued(final Node node, int arg) {
    boolean interrupted = false;
    try {
        for (;;) {
            // 獲取新入隊節點的前驅節點
            final Node p = node.predecessor();
            // 前驅節點爲頭節點而且嘗試獲取資源成功,也就是每一輪循環都會調用tryAcquire嘗試獲取資源,除非阻塞或者跳出循環
            if (p == head && tryAcquire(arg)) {
                // 設置新入隊節點爲頭節點,原來的節點會從隊列中斷開
                setHead(node);
                p.next = null; // help GC
                return interrupted;   // <== 注意,這個位置是跳出死循環的惟一位置
            }
            // 判斷是否須要阻塞當前獲取資源失敗的節點中持有的線程
            if (shouldParkAfterFailedAcquire(p, node))
                // 阻塞當前線程,若是被喚醒則返回並清空線程的中斷標記
                interrupted |= parkAndCheckInterrupt();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        if (interrupted)
            selfInterrupt();
        throw t;
    }
}

/**
 * 檢查而且更新獲取資源失敗的節點的狀態,返回值決定線程是否須要被阻塞。
 * 這個方法是全部循環獲取資源方法中信號控制的主要方法
 */
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 這裏記住ws是當前處理節點的前驅節點的等待狀態
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        // 前驅節點狀態設置成Node.SIGNAL成功,等待被release調用釋放,後繼節點能夠安全地進入阻塞狀態
        return true;
    if (ws > 0) {
        // ws大於0只有一種狀況Node.CANCELLED,說明前驅節點已經取消獲取資源,
        // 這個時候會把全部這類型取消的前驅節點移除,找到一個非取消的節點從新經過next引用鏈接當前節點
        do {
           node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 其餘等待狀態直接修改前驅節點等待狀態爲Node.SIGNAL
        pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
    }
    return false;
}

// 阻塞當前線程,獲取而且重置線程的中斷標記位
private final boolean parkAndCheckInterrupt() {
    // 這個就是阻塞線程的實現,依賴Unsafe的API
    LockSupport.park(this);
    return Thread.interrupted();
}

上面的代碼雖然看起來能基本理解,可是最好用圖推敲一下"空間上的變化":

j-a-q-s-6

j-a-q-s-7

接着分析一下release(int arg)的實現:

// 釋放資源
public final boolean release(int arg) {
    // 嘗試釋放資源
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

// 嘗試釋放資源,獨佔模式下,嘗試經過從新設置status的值從而實現釋放資源的功能
// 這個方法必須由子類實現
protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}

// 解除傳入節點(通常是頭節點)的第一個後繼節點的阻塞狀態,當前處理節點的等待狀態會被CAS更新爲0
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    // 當前處理的節點(通常是頭節點)狀態小於0則直接CAS更新爲0
    if (ws < 0)
        node.compareAndSetWaitStatus(ws, 0);
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 若是節點的第一個後繼節點爲null或者等待狀態大於0(取消),則從等待隊列的尾節點向前遍歷,
        // 找到最後一個不爲null,而且等待狀態小於等於0的節點
        for (Node p = tail; p != node && p != null; p = p.prev)
            if (p.waitStatus <= 0)
                s = p;
    }
    // 解除上面的搜索到的節點的阻塞狀態
    if (s != null)
        LockSupport.unpark(s.thread);
}

接着用上面的圖:

j-a-q-s-8

上面圖中thread-2晉升爲頭節點的第一個後繼節點,等待下一個release()釋放資源喚醒之就能晉升爲頭節點,一旦晉升爲頭節點也就是意味着能夠解除阻塞繼續運行。接着咱們能夠看acquire()的響應中斷版本和帶超時的版本。先看acquireInterruptibly(int arg)

public final void acquireInterruptibly(int arg)
            throws InterruptedException {
    // 獲取而且清空線程中斷標記位,若是是中斷狀態則直接拋InterruptedException異常
    if (Thread.interrupted())
        throw new InterruptedException();
    // 若是獲取資源失敗
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}

// 獨佔模式下響應中斷的獲取資源方法
private void doAcquireInterruptibly(int arg) throws InterruptedException {
    // 基於當前線程新增一個獨佔的Node節點進入同步等待隊列中
    final Node node = addWaiter(Node.EXCLUSIVE);
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                return;
            }
            // 獲取資源失敗進入阻塞狀態
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                    // 解除阻塞後直接拋出InterruptedException異常
                    throw new InterruptedException();
            }
         } catch (Throwable t) {
            cancelAcquire(node);
            throw t;
    }
}

doAcquireInterruptibly(int arg)方法和acquire(int arg)相似,最大的不一樣點在於阻塞線程解除阻塞後並非正常繼續運行,而是直接拋出InterruptedException異常。最後看tryAcquireNanos(int arg, long nanosTimeout)的實現:

// 獨佔模式下嘗試在指定超時時間內獲取資源,響應線程中斷
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout);
}

// 獨佔模式下帶超時時間限制的獲取資源方法
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
    // 超時期限小於0納秒,快速失敗
    if (nanosTimeout <= 0L)
        return false;
    // 超時的最終期限是當前系統時鐘納秒+外部指定的nanosTimeout增量
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                return true;
            }
            // 計算出剩餘的超時時間
            nanosTimeout = deadline - System.nanoTime();
            // 剩餘超時時間小於0說明已經超時則取消獲取
            if (nanosTimeout <= 0L) {
                cancelAcquire(node);
                return false;
            }
            // 這裏會判斷剩餘超時時間大於1000納秒的時候纔會進行帶超時期限的線程阻塞,不然會進入下一輪獲取嘗試
            if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
                    LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
            }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    }
}

tryAcquireNanos(int arg, long nanosTimeout)其實和doAcquireInterruptibly(int arg)相似,它們都響應線程中斷,不過tryAcquireNanos()在獲取資源的每一輪循環嘗試都會計算剩餘可用的超時時間,只有同時知足獲取失敗須要阻塞而且剩餘超時時間大於SPIN_FOR_TIMEOUT_THRESHOLD(1000納秒)的狀況下才會進行阻塞。

獨佔模式的同步器的一個顯著特色就是:頭節點的第一個有效(非取消)的後繼節點,老是嘗試獲取資源,一旦獲取資源成功就會解除阻塞而且晉升爲頭節點,原來所在節點會移除出同步等待隊列,原來的隊列長度就會減小1,而後頭結點的第一個有效的後繼節點繼續開始競爭資源。

j-a-q-s-9

使用獨佔模式同步器的主要類庫有:

  • 可重入鎖ReentrantLock
  • 讀寫鎖ReentrantReadWriteLock中的寫鎖WriteLock

共享模式

共享(SHARED)模式中的"共享"的含義是:同一個時刻,若是有一個節點所在線程獲取(acuqire)原子狀態status成功,那麼它會解除阻塞被喚醒,而且會把喚醒狀態傳播到全部的後繼節點(換言之就是喚醒整個同步等待隊列中的全部節點)。共享模式同步器的功能主要由下面的四個方法提供:

  • acquireShared(int arg);申請獲取arg個原子狀態status(申請成功能夠簡單理解爲status = status - arg)。
  • acquireSharedInterruptibly(int arg):申請獲取arg個原子狀態status,響應線程中斷。
  • tryAcquireSharedNanos(int arg, long nanosTimeout):申請獲取arg個原子狀態status,帶超時的版本。
  • releaseShared(int arg):釋放arg個原子狀態status(釋放成功能夠簡單理解爲status = status + arg)。

先看acquireShared(int arg)的源碼:

// 共享模式下獲取資源
public final void acquireShared(int arg) {
    // 注意tryAcquireShared方法值爲整型,只有小於0的時候纔會加入同步等待隊列
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

// 共享模式下嘗試獲取資源,此方法須要由子類覆蓋
protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}

// 共享模式下獲取資源和處理同步等待隊列的方法
private void doAcquireShared(int arg) {
    // 基於當前線程新建一個標記爲共享的新節點
    final Node node = addWaiter(Node.SHARED);
    boolean interrupted = false;
    try {
        for (;;) {
            final Node p = node.predecessor();
            // 若是當前節點的前驅節點是頭節點
            if (p == head) {
                // 每一輪循環都會調用tryAcquireShared嘗試獲取資源,除非阻塞或者跳出循環
                int r = tryAcquireShared(arg);
                if (r >= 0) {  // <= tryAcquireShared方法>=0說明直資源獲取成功
                    // 設置頭結點,而且傳播獲取資源成功的狀態,這個方法的做用是確保喚醒狀態傳播到全部的後繼節點
                    // 而後任意一個節點晉升爲頭節點都會喚醒其第一個有效的後繼節點,起到一個鏈式釋放和解除阻塞的動做
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    return;
                }
            }
            // 判斷獲取資源失敗是否須要阻塞,這裏會把前驅節點的等待狀態CAS更新爲Node.SIGNAL
            if (shouldParkAfterFailedAcquire(p, node))
                interrupted |= parkAndCheckInterrupt();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    } finally {
        if (interrupted)
            selfInterrupt();
    }
}

// 設置同步等待隊列的頭節點,判斷當前處理的節點的後繼節點是否共享模式的節點,若是共享模式的節點,
// propagate大於0或者節點的waitStatus爲PROPAGATE則進行共享模式下的釋放資源
private void setHeadAndPropagate(Node node, int propagate) {
    // h爲頭節點的中間變量
    Node h = head;
    // 設置當前處理節點爲頭節點
    setHead(node);
    // 這個判斷條件比較複雜:入參propagate大於0 || 頭節點爲null || 頭節點的狀態爲非取消 || 再次獲取頭節點爲null || 再次獲取頭節點不爲取消
    if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        // 當前節點(其實已經成爲頭節點)的第一個後繼節點爲null或者是共享模式的節點
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

// Release action for shared mode:共享模式下的釋放資源動做
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        // 頭節點不爲null而且不爲尾節點
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            // 若是頭節點等待狀態爲SIGNAL(-1)則CAS更新它爲0,更新成功後喚醒和解除其後繼節點的阻塞
            if (ws == Node.SIGNAL) {
                if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
                    continue;
                // 喚醒頭節點的後繼節點
                unparkSuccessor(h);
            }
            // 若是頭節點的等待狀態爲0,則CAS更新它爲PROPAGATE(-3)
            else if (ws == 0 && !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
                continue;
            }
        // 頭節點沒有變動,則跳出循環
        if (h == head)
            break;
    }
}

其實代碼的實現和獨佔模式有不少相似的地方,一個很大的不一樣點是:共享模式同步器當節點獲取資源成功晉升爲頭節點以後,它會把自身的等待狀態經過CAS更新爲Node.PROPAGATE,下一個加入等待隊列的新節點會把頭節點的等待狀態值更新回Node.SIGNAL,標記後繼節點處於能夠被喚醒的狀態,若是趕上資源釋放,那麼這個阻塞的節點就能被喚醒解除阻塞。咱們仍是畫圖理解一下,先假設tryAcquireShared(int arg)老是返回小於0的值,入隊兩個阻塞的線程thread-1和thread-2,而後進行資源釋放確保tryAcquireShared(int arg)老是返回大於0的值:

j-a-q-s-10

看起來和獨佔模式下的同步等待隊列差很少,實際上真正不一樣的地方在於有節點解除阻塞和晉升爲頭節點的過程。所以咱們能夠先看releaseShared(int arg)的源碼:

// 共享模式下釋放資源
public final boolean releaseShared(int arg) {
    // 嘗試釋放資源成功則調用前面分析過的doReleaseShared以傳播喚醒狀態和unpark頭節點的後繼節點
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

// 共享模式下嘗試釋放資源,必須由子類覆蓋
protected boolean tryReleaseShared(int arg) {
    throw new UnsupportedOperationException();
}

releaseShared(int arg)就是在tryReleaseShared(int arg)調用返回true的狀況下主動調用一次doReleaseShared()從而基於頭節點傳播喚醒狀態和unpark頭節點的後繼節點。接着以前的圖:

j-a-q-s-11

j-a-q-s-12

接着看acquireSharedInterruptibly(int arg)的源碼實現:

// 共享模式下獲取資源的方法,響應線程中斷
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    return;
                }
            }
            // 和非響應線程中斷的acquireShared方法相似,不過這裏解除阻塞以後直接拋出異常InterruptedException
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    }
}

最後看tryAcquireSharedNanos(int arg, long nanosTimeout)的源碼實現:

// 共享模式下獲取資源的方法,帶超時時間版本
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        // 注意這裏只要tryAcquireShared >= 0或者doAcquireSharedNanos返回true都認爲獲取資源成功
        return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout);
}

private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    // 計算超時的最終期限    
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.SHARED);
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    return true;
                }
            }
            //從新計算剩餘的超時時間 
            nanosTimeout = deadline - System.nanoTime();
            // 超時的狀況下直接取消獲取
            if (nanosTimeout <= 0L) {
                cancelAcquire(node);
                return false;
            }
            // 知足阻塞狀態而且剩餘的超時時間大於閥值1000納秒則經過LockSupport.parkNanos()阻塞線程
            if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
                LockSupport.parkNanos(this, nanosTimeout);
            // 解除阻塞後判斷線程的中斷標記而且清空標記位,若是是處於中斷狀態則拋出InterruptedException 
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    }
}

共享模式的同步器的一個顯著特色就是:頭節點的第一個有效(非取消)的後繼節點,老是嘗試獲取資源,一旦獲取資源成功就會解除阻塞而且晉升爲頭節點,原來所在節點會移除出同步等待隊列,原來的隊列長度就會減小1,從新設置頭節點的過程會傳播喚醒的狀態,簡單來講就是喚醒一個有效的後繼節點,只要一個節點能夠晉升爲頭節點,它的後繼節點就能被喚醒。節點的喚醒順序遵循相似於FIFO的原則,通俗說就是先阻塞或者阻塞時間最長則先被喚醒

j-a-q-s-13

使用共享模式同步器的主要類庫有:

  • 信號量Semaphore
  • 倒數柵欄CountDownLatch

Condition的實現

Condition實例的創建是在Lock接口的newCondition()方法,它是鎖條件等待的實現,基於做用或者語義能夠見Condition接口的相關API註釋:

Condition是對象監視器鎖方法Object#wait()、Object#notify()和Object#notifyAll()的替代實現,對象監視器鎖實現鎖的時候做用的效果是每一個鎖對象必須使用多個wait-set(JVM內置的等待隊列),經過Object提供的方法和監視器鎖結合使用就能達到Lock的實現效果。若是替換synchronized方法和語句而且結合使用Lock和Condition,就能替換而且達到對象監視器鎖的效果。

Condition必須固有地綁定在一個Lock的實現類上,也就是要經過Lock的實例創建Condition實例,並且Condition的方法調用使用必須在Lock的"鎖定代碼塊"中,這一點和synchronized關鍵字以及Object的相關JNI方法使用的狀況十分類似。

前文介紹過Condition接口提供的方法以及Condition隊列,也就是條件等待隊列,經過PPT畫圖簡單介紹了它的隊列節點組成。實際上,條件等待隊列須要結合同步等待隊列使用,這也恰好對應於前面提到的Condition的方法調用使用必須在Lock的鎖定代碼塊中。聽起來很懵逼,咱們慢慢分析一下ConditionObject的方法源碼就能知道具體的緣由。

先看ConditionObject#await()方法:

// 退出等待後主動進行中斷當前線程
private static final int REINTERRUPT = 1;
// 退出等待後拋出InterruptedException異常
private static final int THROW_IE   = -1;
/** 
 * 可中斷的條件等待實現
 * 一、當前線程處於中斷狀態則拋出InterruptedException
 * 二、保存getState返回的鎖狀態,而且使用此鎖狀態調用release釋放全部的阻塞線程
 * 三、線程加入等待隊列進行阻塞,直到signall或者中斷
 * 四、經過保存getState返回的鎖狀態調用acquire方法
 * 五、第4步中阻塞過程當中中斷則拋出InterruptedException
 */
public final void await() throws InterruptedException {
    // 若是線程是中斷狀態則清空中斷標記位而且拋出InterruptedException
    if (Thread.interrupted())
        throw new InterruptedException();
    // 當前線程所在的新節點加入條件等待隊列
    Node node = addConditionWaiter();
    // 釋放當前AQS中的全部資源返回資源的status保存值,也就是基於status的值調用release(status) - 其實這一步是解鎖操做
    int savedState = fullyRelease(node);
    // 初始化中斷模式
    int interruptMode = 0;
    // 若是節點新建的節點不位於同步隊列中(理論上應該是必定不存在),則對節點所在線程進行阻塞,第二輪循環理論上節點必定在同步等待隊列中
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        // 處理節點所在線程中斷的轉換操做
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 節點所在線程被喚醒後,若是節點所在線程沒有處於中斷狀態,則以獨佔模式進行頭節點競爭
    // 注意這裏使用的status是前面釋放資源時候返回的保存下來的status
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    // 下一個等待節點不空,則從等待隊列中移除全部取消的等待節點
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    // interruptMode不爲0則按照中斷模式進行不一樣的處理
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

// 釋放當前AQS中的全部資源,其實也就是基於status的值調用release(status)
// 這一步對於鎖實現來講,就是一個解鎖操做
final int fullyRelease(Node node) {
    try {
        int savedState = getState();
        if (release(savedState))
            return savedState;
        throw new IllegalMonitorStateException();
    } catch (Throwable t) {
        // 釋放失敗則標記等待狀態爲取消
        node.waitStatus = Node.CANCELLED;
        throw t;
    }
}

// 傳入的節點是否在同步隊列中
final boolean isOnSyncQueue(Node node) {
    // 節點等待您狀態爲CONDITION或者前驅節點爲null則返回false
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    // 由於等待隊列是經過nextWaiter鏈接,next引用存在說明節點位於同步隊列
    if (node.next != null)
        return true;
    // 從同步隊列的尾部向前遍歷是否存在傳入的節點實例
    return findNodeFromTail(node);
}

// 從同步隊列的尾部向前遍歷是否存在傳入的節點實例
private boolean findNodeFromTail(Node node) {
    for (Node p = tail;;) {
        if (p == node)
            return true;
        if (p == null)
            return false;
        p = p.prev;
    }
}

// 這是一個很複雜的判斷,用了兩個三目表達式,做用是若是新建的等待節點所在線程中斷,
// 則把節點的狀態由CONDITION更新爲0,而且加入到同步等待隊列中,返回THROW_IE中斷狀態,若是加入同步隊列失敗,返回REINTERRUPT
// 若是新建的等待節點所在線程沒有中斷,返回0,也就是初始狀態的interruptMode
private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;
}

// 節點線程中斷取消等待後的轉換操做
final boolean transferAfterCancelledWait(Node node) {
    // CAS更新節點的狀態由CONDITION更改成0
    if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) {
        // 節點加入同步等待隊列
        enq(node);
        return true;
    }
    // 這裏嘗試自旋,直到節點加入同步等待隊列成功
    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;
}

// 等待完畢後報告中斷處理,前邊的邏輯獲得的interruptMode若是爲THROW_IE則拋出InterruptedException,若是爲REINTERRUPT則中斷當前線程
private void reportInterruptAfterWait(int interruptMode) throws InterruptedException {
    if (interruptMode == THROW_IE)
        throw new InterruptedException();
    else if (interruptMode == REINTERRUPT)
        selfInterrupt();
}

其實上面的await()邏輯並不複雜,前提是理解了對象監視器鎖那套等待和喚醒的機制(由JVM實現,C語言學得好的能夠去看下源碼),這裏只是經過算法和數據結構從新進行了一次實現。await()主要使用了兩個隊列:同步等待隊列和條件等待隊列。咱們先假設有兩個線程thread-1和thread-2調用了下面的代碼中的process()方法:

ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();

public void process(){
    try{
        lock.lock();
        condition.await();
        // 省略其餘邏輯...
    }finally{
        lock.unlock();
    }
}

ReentrantLock使用的是AQS獨佔模式的實現,所以在調用lock()方法的時候,同步等待隊列的一個瞬時快照(假設線程thread-1先加入同步等待隊列)可能以下:

j-a-q-s-14.png

接着,線程thread-1所在節點是頭節點的後繼節點,獲取鎖成功,它解除阻塞後能夠調用await()方法,這個時候會釋放同步等待隊列中的全部等待節點,也就是線程thread-2所在的節點也被釋放,所以線程thread-2也會調用await()方法:

j-a-q-s-15.png

只要有線程可以到達await()方法,那麼原來的同步器中的同步等待隊列就會釋放全部阻塞節點,表現爲釋放鎖,而後這些釋放掉的節點會加入到等待隊列中,等待隊列中的節點也是阻塞的,這個時候只有經過signal()或者signalAll()進行隊列元素轉移纔有機會喚醒阻塞的線程。所以接着看signal()signalAll()的源碼實現:

// 從等待隊列中移動一個等待時間最長的線程(若是過存在的話)到鎖同步等待隊列中
public final void signal() {
    // 判斷當前線程是否和獨佔線程一致,其實就是此操做須要在鎖代碼塊中執行
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

// 基於第一個等待節點進行Signal操做
private void doSignal(Node first) {
    do {
        // 首節點的下一個等待節點爲空,說明只剩下一個等待節點
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        // 當前處理節點從鏈表從移除    
        first.nextWaiter = null;
    } while (!transferForSignal(first) && (first = firstWaiter) != null);
}

// 喚醒的轉換操做
final boolean transferForSignal(Node node) {
    // CAS更新節點狀態由CONDITION到0,更新失敗則返回false不喚醒
    if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
        return false;
    // 節點做爲新節點從新加入到同步等待隊列
    Node p = enq(node);
    int ws = p.waitStatus;
    // 取消或者更新節點等待狀態爲SIGNAL的節點須要解除阻塞進行從新同步,這裏的操做只針對取消和狀態異常的節點
    if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

// 從等待隊列中移動全部等待時間最長的線程(若是過存在的話)到鎖同步等待隊列中
public final void signalAll() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
        if (first != null)
        doSignalAll(first);
}

// 基於第一個等待節點進行SignalAll操做
private void doSignalAll(Node first) {
    // 置空lastWaiter和firstWaiter
    lastWaiter = firstWaiter = null;
    do {
        // 獲取下一個等待節點
        Node next = first.nextWaiter;
        // 當前處理節點從鏈表從移除
        first.nextWaiter = null;
        // 處理當前節點
        transferForSignal(first);
        // 更新中間引用
        first = next;
    } while (first != null);
}

其實signal()或者signalAll()會對取消的節點或者短暫中間狀態的節點進行解除阻塞,可是正常狀況下,它們的操做結果是把阻塞等待時間最長的一個或者全部節點從新加入到AQS的同步等待隊列中。例如,上面的例子調用signal()方法後以下:

j-a-q-s-16.png

這樣子,至關於線程thread-1從新加入到AQS同步等待隊列中,而且開始競爭頭節點,一旦競爭成功,就可以解除阻塞。這個時候從邏輯上看,signal()方法最終解除了對線程thread-1的阻塞。await()的其餘變體方法的原理是相似的,這裏由於篇幅緣由再也不展開。這裏小結一下Condition的顯著特色:

  • 一、同時依賴兩個同步等待隊列,一個是AQS提供,另外一個是ConditionObject提供的。
  • 二、await()方法會釋放AQS同步等待隊列中的阻塞節點,這些節點會加入到條件隊列中進行阻塞。
  • 三、signal()或者signalAll()會把條件隊列中的節點從新加入AQS同步等待隊列中,並不解除正常節點的阻塞狀態。
  • 四、接第3步,這些進入到AQS同步等待隊列的節點會從新競爭成爲頭節點,其實也就是前面分析過的獨佔模式下的AQS的運做原理。

取消獲取資源(cancelAcquire)

新節點加入等待隊列失敗致使任何類型的異常或者帶超時版本的API調用的時候剩餘超時時間小於等於零的時候,就會調用cancelAcquire()方法,用於取消該節點對應節點獲取資源的操做。

// 取消節點獲取資源的操做
private void cancelAcquire(Node node) {
    // 節點爲null直接返回
    if (node == null)
        return;
    // 置空節點持有的線程,由於此時節點線程已經發生中斷
    node.thread = null;
    Node pred = node.prev;
    // 這個循環是爲了獲取當前節點的上一個不爲取消狀態的節點,也就是中間若是發生了取消的節點都直接斷開
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;
    // 保存當前節點的上一個不爲取消狀態的節點的後繼節點    
    Node predNext = pred.next;
    // 當前節點等待狀態更新爲CANCELLED
    node.waitStatus = Node.CANCELLED;
    // 若是當前節點爲尾節點,則直接更新尾節點爲當前節點的上一個不爲取消狀態的節點
    if (node == tail && compareAndSetTail(node, pred)) {
         // 而後更新該節點的後繼節點爲null,由於它已經成爲新的尾節點
         pred.compareAndSetNext(predNext, null);
    } else {
        int ws;
        // 當前節點的上一個不爲取消狀態的節點已經不是頭節點的狀況,須要把當前取消的節點從AQS同步等待隊列中斷開
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) && pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                pred.compareAndSetNext(predNext, next);
        } else {
            // 當前節點的上一個不爲取消狀態的節點已是頭節點,至關於頭節點以後的節點都是取消,須要喚醒當前節點的後繼節點
            unparkSuccessor(node);
        }
        // 節點後繼節點設置爲自身,那麼就不會影響後繼節點
        node.next = node;
    }
}

cancelAcquire()方法有多處調用,主要包括下面的狀況:

  • 一、節點線程在阻塞過程當中主動中斷的狀況下會調用。
  • 二、acquire的處理過程發生任何異常的狀況下都會調用,包括tryAcquire()tryAcquireShared()等。
  • 三、新節點加入等待隊列失敗致使任何類型的異常或者帶超時版本的API調用的時候剩餘超時時間小於等於零的時候。

cancelAcquire()主要做用是把取消的節點移出同步等待隊列,必須時候須要進行後繼節點的喚醒。

實戰篇

AQS是一個抽象的同步器基礎框架,其實咱們也能夠直接使用它實現一些高級的併發框架。下面基於AQS實現一些非內建的功能,這兩個例子來自於AQS的註釋中。

metux

大學C語言課程中常常說起到的只有一個資源的metux(互斥區),也就是說,同一個時刻,只能有一個線程獲取到資源,其餘獲取資源的線程須要阻塞等待到前一個線程釋放資源。

public class Metux implements Lock, Serializable {

    private static class Sync extends AbstractQueuedSynchronizer {

        @Override
        protected boolean tryAcquire(int arg) {
            assert 1 == arg;
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int arg) {
            assert 1 == arg;
            if (!isHeldExclusively()) {
                throw new IllegalMonitorStateException();
            }
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public Condition newCondition() {
            return new ConditionObject();
        }

        public boolean isLocked() {
            return getState() != 0;
        }

        @Override
        public boolean isHeldExclusively() {
            return getExclusiveOwnerThread() == Thread.currentThread();
        }

        private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException {
            s.defaultReadObject();
            setState(0);
        }
    }

    private final Sync sync = new Sync();

    @Override
    public void lock() {
        sync.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }

    public boolean isLocked() {
        return sync.isLocked();
    }

    public boolean isHeldByCurrentThread() {
        return sync.isHeldExclusively();
    }

    @Override
    public void unlock() {
        sync.release(1);
    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }

    public static void main(String[] args) throws Exception {
        final Metux metux = new Metux();
        new Thread(() -> {
            metux.lock();
            System.out.println(String.format("%s-thread-1獲取鎖成功休眠3秒...", LocalDateTime.now()));
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                //ignore
            }
            metux.unlock();
            System.out.println(String.format("%s-thread-1獲解鎖成功...", LocalDateTime.now()));
            return;
        }, "thread-1").start();
        new Thread(() -> {
            metux.lock();
            System.out.println(String.format("%s-thread-2獲取鎖成功...",LocalDateTime.now()));
            return;
        }, "thread-2").start();
        Thread.sleep(Integer.MAX_VALUE);
    }
}

某個時間的某次運行結果以下:

2019-04-07T11:49:27.858791200-thread-1獲取鎖成功休眠3秒...
2019-04-07T11:49:30.876567-thread-2獲取鎖成功...
2019-04-07T11:49:30.876567-thread-1獲解鎖成功...

二元柵欄

二元柵欄是CountDownLatch的簡化版,只容許一個線程阻塞,由另外一個線程負責喚醒。

public class BooleanLatch {

    private static class Sync extends AbstractQueuedSynchronizer {

        boolean isSignalled() {
            return getState() != 0;
        }

        @Override
        protected int tryAcquireShared(int ignore) {
            return isSignalled() ? 1 : -1;
        }

        @Override
        protected boolean tryReleaseShared(int ignore) {
            setState(1);
            return true;
        }
    }

    private final Sync sync = new Sync();

    public boolean isSignalled() {
        return sync.isSignalled();
    }

    public void signal() {
        sync.releaseShared(1);
    }

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public static void main(String[] args) throws Exception {
        BooleanLatch latch = new BooleanLatch();
        new Thread(()-> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                //ignore
            }
            latch.signal();
        }).start();
        System.out.println(String.format("[%s]-主線程進入阻塞...", LocalDateTime.now()));
        latch.await();
        System.out.println(String.format("[%s]-主線程進被喚醒...", LocalDateTime.now()));
    }
}

某個時間的某次運行結果以下:

[2019-04-07T11:55:12.647816200]-主線程進入阻塞...
[2019-04-07T11:55:15.632088]-主線程進被喚醒...

小結

在JUC的重要併發類庫或者容器中,AQS起到了基礎框架的做用,理解同步器的實現原理,有助於理解和分析其餘併發相關類庫的實現。這篇文章先後耗費了接近1個月時間編寫,DEBUG過程最好使用多線程斷點,不然很難模擬真實的狀況。AQS裏面的邏輯是相對複雜的,很敬佩併發大師Douglas S. Lea如此精巧的類庫設計。

參考資料:

  • 《The Art of Multiprocessor Programming》
  • 《The java.util.concurrent Synchronizer Framework》
  • JDK11相關源碼

(本文完 c-a-30-d e-a-20190407)

相關文章
相關標籤/搜索