AbstractQueuedSynchronizer源碼解讀

1. 背景

AQS(java.util.concurrent.locks.AbstractQueuedSynchronizer)是Doug Lea大師創做的用來構建鎖或者其餘同步組件(信號量、事件等)的基礎框架類。JDK中許多併發工具類的內部實現都依賴於AQS,如ReentrantLock, Semaphore, CountDownLatch等等。學習AQS的使用與源碼實現對深刻理解concurrent包中的類有很大的幫助。
本文重點介紹AQS中的基本實現思路,包括獨佔鎖、共享鎖的獲取和釋放實現原理和一些代碼細節。html

對於AQS中ConditionObject的相關實現,能夠參考個人另外一篇博文AbstractQueuedSynchronizer源碼解讀--續篇之Conditionjava

2. 簡介

AQS的主要使用方式是繼承它做爲一個內部輔助類實現同步原語,它能夠簡化你的併發工具的內部實現,屏蔽同步狀態管理、線程的排隊、等待與喚醒等底層操做。node

AQS設計基於模板方法模式,開發者須要繼承同步器而且重寫指定的方法,將其組合在併發組件的實現中,調用同步器的模板方法,模板方法會調用使用者重寫的方法。編程

3. 實現思路

下面介紹下AQS具體實現的大體思路。api

AQS內部維護一個CLH隊列來管理鎖。
線程會首先嚐試獲取鎖,若是失敗,則將當前線程以及等待狀態等信息包成一個Node節點加到同步隊列裏。
接着會不斷循環嘗試獲取鎖(條件是當前節點爲head的直接後繼纔會嘗試),若是失敗則會阻塞本身,直至被喚醒;
而當持有鎖的線程釋放鎖時,會喚醒隊列中的後繼線程。安全

下面列舉JDK中幾種常見使用了AQS的同步組件:數據結構

  • ReentrantLock: 使用了AQS的獨佔獲取和釋放,用state變量記錄某個線程獲取獨佔鎖的次數,獲取鎖時+1,釋放鎖時-1,在獲取時會校驗線程是否能夠獲取鎖。
  • Semaphore: 使用了AQS的共享獲取和釋放,用state變量做爲計數器,只有在大於0時容許線程進入。獲取鎖時-1,釋放鎖時+1。
  • CountDownLatch: 使用了AQS的共享獲取和釋放,用state變量做爲計數器,在初始化時指定。只要state還大於0,獲取共享鎖會由於失敗而阻塞,直到計數器的值爲0時,共享鎖才容許獲取,全部等待線程會被逐一喚醒。

3.1 如何獲取鎖

獲取鎖的思路很直接:多線程

while (不知足獲取鎖的條件) {
    把當前線程包裝成節點插入同步隊列
    if (須要阻塞當前線程)
        阻塞當前線程直至被喚醒
}
將當前線程從同步隊列中移除

以上是一個很簡單的獲取鎖的僞代碼流程,AQS的具體實現比這個複雜一些,也稍有不一樣,但思想上是與上述僞代碼契合的。
經過循環檢測是否可以獲取到鎖,若是不知足,則可能會被阻塞,直至被喚醒。併發

3.2 如何釋放鎖

釋放鎖的過程設計修改同步狀態,以及喚醒後繼等待線程:框架

修改同步狀態
if (修改後的狀態容許其餘線程獲取到鎖)
    喚醒後繼線程

這只是很簡略的釋放鎖的僞代碼示意,AQS具體實現中能看到這個簡單的流程模型。

3.3 API簡介

經過上面的AQS大致思路分析,咱們能夠看到,AQS主要作了三件事情

  • 同步狀態的管理
  • 線程的阻塞和喚醒
  • 同步隊列的維護

下面三個protected final方法是AQS中用來訪問/修改同步狀態的方法:

  • int getState(): 獲取同步狀態

  • void setState(): 設置同步狀態

  • boolean compareAndSetState(int expect, int update):基於CAS,原子設置當前狀態

在自定義基於AQS的同步工具時,咱們能夠選擇覆蓋實現如下幾個方法來實現同步狀態的管理:

方法 描述
boolean tryAcquire(int arg) 試獲取獨佔鎖
boolean tryRelease(int arg) 試釋放獨佔鎖
int tryAcquireShared(int arg) 試獲取共享鎖
boolean tryReleaseShared(int arg) 試釋放共享鎖
boolean isHeldExclusively() 當前線程是否得到了獨佔鎖

以上的幾個試獲取/釋放鎖的方法的具體實現應當是無阻塞的。

AQS自己將同步狀態的管理用模板方法模式都封裝好了,如下列舉了AQS中的一些模板方法:

方法 描述
void acquire(int arg) 獲取獨佔鎖。會調用tryAcquire方法,若是未獲取成功,則會進入同步隊列等待
void acquireInterruptibly(int arg) 響應中斷版本的acquire
boolean tryAcquireNanos(int arg,long nanos) 響應中斷+帶超時版本的acquire
void acquireShared(int arg) 獲取共享鎖。會調用tryAcquireShared方法
void acquireSharedInterruptibly(int arg) 響應中斷版本的acquireShared
boolean tryAcquireSharedNanos(int arg,long nanos) 響應中斷+帶超時版本的acquireShared
boolean release(int arg) 釋放獨佔鎖
boolean releaseShared(int arg) 釋放共享鎖
Collection getQueuedThreads() 獲取同步隊列上的線程集合

上面看上去不少方法,其實從語義上來區分就是獲取和釋放,從模式上區分就是獨佔式和共享式,從中斷相應上來看就是支持和不支持。

4. 代碼解讀

4.1 數據結構定義

首先看一下AQS中的嵌套類Node的定義。

static final class Node {

    /**
     * 用於標記一個節點在共享模式下等待
     */
    static final Node SHARED = new Node();

    /**
     * 用於標記一個節點在獨佔模式下等待
     */
    static final Node EXCLUSIVE = null;

    /**
     * 等待狀態:取消
     */
    static final int CANCELLED = 1;

    /**
     * 等待狀態:通知
     */
    static final int SIGNAL = -1;

    /**
     * 等待狀態:條件等待
     */
    static final int CONDITION = -2;

    /**
     * 等待狀態:傳播
     */
    static final int PROPAGATE = -3;

    /**
     * 等待狀態
     */
    volatile int waitStatus;

    /**
     * 前驅節點
     */
    volatile Node prev;

    /**
     * 後繼節點
     */
    volatile Node next;

    /**
     * 節點對應的線程
     */
    volatile Thread thread;

    /**
     * 等待隊列中的後繼節點
     */
    Node nextWaiter;

    /**
     * 當前節點是否處於共享模式等待
     */
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    /**
     * 獲取前驅節點,若是爲空的話拋出空指針異常
     */
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null) {
            throw new NullPointerException();
        } else {
            return p;
        }
    }

    Node() {
    }

    /**
     * addWaiter會調用此構造函數
     */
    Node(Thread thread, Node mode) {
        this.nextWaiter = mode;
        this.thread = thread;
    }

    /**
     * Condition會用到此構造函數
     */
    Node(Thread thread, int waitStatus) {
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

這裏有必要專門梳理一下節點等待狀態的定義,由於AQS源碼中有大量的狀態判斷與躍遷。

描述
CANCELLED (1) 當前線程由於超時或者中斷被取消。這是一個終結態,也就是狀態到此爲止。
SIGNAL (-1) 當前線程的後繼線程被阻塞或者即將被阻塞,當前線程釋放鎖或者取消後須要喚醒後繼線程。這個狀態通常都是後繼線程來設置前驅節點的。
CONDITION (-2) 當前線程在condition隊列中。
PROPAGATE (-3) 用於將喚醒後繼線程傳遞下去,這個狀態的引入是爲了完善和加強共享鎖的喚醒機制。在一個節點成爲頭節點以前,是不會躍遷爲此狀態的
0 表示無狀態。

對於分析AQS中不涉及ConditionObject部分的代碼,能夠認爲隊列中的節點狀態只會是CANCELLED, SIGNAL, PROPAGATE, 0這幾種狀況。


上圖爲自制的AQS狀態的流轉圖,AQS中0狀態和CONDITION狀態爲始態,CANCELLED狀態爲終態。0狀態同時也能夠是節點生命週期的終態。
注意,上圖僅表示狀態之間流轉的可達性,並不表明必定可以從一個狀態沿着線隨意躍遷。

在AQS中包含了head和tail兩個Node引用,其中head在邏輯上的含義是當前持有鎖的線程,head節點其實是一個虛節點,自己並不會存儲線程信息。
當一個線程沒法獲取鎖而被加入到同步隊列時,會用CAS來設置尾節點tail爲當前線程對應的Node節點。
head和tail在AQS中是延遲初始化的,也就是在須要的時候纔會被初始化,也就意味着在全部線程都能獲取到鎖的狀況下,隊列中的head和tail都會是null。

4.2 獲取獨佔鎖的實現

下面來具體看看acquire(int arg)的實現:

/**
 * 獲取獨佔鎖,對中斷不敏感。
 * 首先嚐試獲取一次鎖,若是成功,則返回;
 * 不然會把當前線程包裝成Node插入到隊列中,在隊列中會檢測是否爲head的直接後繼,並嘗試獲取鎖,
 * 若是獲取失敗,則會經過LockSupport阻塞當前線程,直至被釋放鎖的線程喚醒或者被中斷,隨後再次嘗試獲取鎖,如此反覆。
 */
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

/**
 * 在隊列中新增一個節點。
 */
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    // 快速嘗試
    if (pred != null) {
        node.prev = pred;
        // 經過CAS在隊尾插入當前節點
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 初始狀況或者在快速嘗試失敗後插入節點
    enq(node);
    return node;
}

/**
 * 經過循環+CAS在隊列中成功插入一個節點後返回。
 */
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        // 初始化head和tail
        if (t == null) {
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            /*
             * AQS的精妙就是體如今不少細節的代碼,好比須要用CAS往隊尾裏增長一個元素
             * 此處的else分支是先在CAS的if前設置node.prev = t,而不是在CAS成功以後再設置。
             * 一方面是基於CAS的雙向鏈表插入目前沒有完美的解決方案,另外一方面這樣子作的好處是:
             * 保證每時每刻tail.prev都不會是一個null值,不然若是node.prev = t
             * 放在下面if的裏面,會致使一個瞬間tail.prev = null,這樣會使得隊列不完整。
             */
            node.prev = t;
            // CAS設置tail爲node,成功後把老的tail也就是t鏈接到node。
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

/**
 * 在隊列中的節點經過此方法獲取鎖,對中斷不敏感。
 */
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            /*
             * 檢測當前節點前驅是否head,這是試獲取鎖的資格。
             * 若是是的話,則調用tryAcquire嘗試獲取鎖,
             * 成功,則將head置爲當前節點。
             */
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            /*
             * 若是未成功獲取鎖則根據前驅節點判斷是否要阻塞。
             * 若是阻塞過程當中被中斷,則置interrupted標誌位爲true。
             * shouldParkAfterFailedAcquire方法在前驅狀態不爲SIGNAL的狀況下都會循環重試獲取鎖。
             */
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

/**
 * 根據前驅節點中的waitStatus來判斷是否須要阻塞當前線程。
 */
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * 前驅節點設置爲SIGNAL狀態,在釋放鎖的時候會喚醒後繼節點,
         * 因此後繼節點(也就是當前節點)如今能夠阻塞本身。
         */
        return true;
    if (ws > 0) {
        /*
         * 前驅節點狀態爲取消,向前遍歷,更新當前節點的前驅爲往前第一個非取消節點。
         * 當前線程會以後會再次回到循環並嘗試獲取鎖。
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
         /**
          * 等待狀態爲0或者PROPAGATE(-3),設置前驅的等待狀態爲SIGNAL,
          * 而且以後會回到循環再次重試獲取鎖。
          */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}


/**
 * 該方法實現某個node取消獲取鎖。
 */
private void cancelAcquire(Node node) {

   if (node == null)
       return;

   node.thread = null;

   // 遍歷並更新節點前驅,把node的prev指向前部第一個非取消節點。
   Node pred = node.prev;
   while (pred.waitStatus > 0)
       node.prev = pred = pred.prev;

   // 記錄pred節點的後繼爲predNext,後續CAS會用到。
   Node predNext = pred.next;

   // 直接把當前節點的等待狀態置爲取消,後繼節點即使也在cancel能夠跨越node節點。
   node.waitStatus = Node.CANCELLED;

   /*
    * 若是CAS將tail從node置爲pred節點了
    * 則剩下要作的事情就是嘗試用CAS將pred節點的next更新爲null以完全切斷pred和node的聯繫。
    * 這樣一來就斷開了pred與pred的全部後繼節點,這些節點因爲變得不可達,最終會被回收掉。
    * 因爲node沒有後繼節點,因此這種狀況到這裏整個cancel就算是處理完畢了。
    *
    * 這裏的CAS更新pred的next即便失敗了也不要緊,說明有其它新入隊線程或者其它取消線程更新掉了。
    */
   if (node == tail && compareAndSetTail(node, pred)) {
       compareAndSetNext(pred, predNext, null);
   } else {
       // 若是node還有後繼節點,這種狀況要作的事情是把pred和後繼非取消節點拼起來。
       int ws;
       if (pred != head &&
           ((ws = pred.waitStatus) == Node.SIGNAL ||
            (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
           pred.thread != null) {
           Node next = node.next;
           /* 
            * 若是node的後繼節點next非取消狀態的話,則用CAS嘗試把pred的後繼置爲node的後繼節點
            * 這裏if條件爲false或者CAS失敗都不要緊,這說明可能有多個線程在取消,總歸會有一個能成功的。
            */
           if (next != null && next.waitStatus <= 0)
               compareAndSetNext(pred, predNext, next);
       } else {
           /*
            * 這時說明pred == head或者pred狀態取消或者pred.thread == null
            * 在這些狀況下爲了保證隊列的活躍性,須要去喚醒一次後繼線程。
            * 舉例來講pred == head徹底有可能實際上目前已經沒有線程持有鎖了,
            * 天然就不會有釋放鎖喚醒後繼的動做。若是不喚醒後繼,隊列就掛掉了。
            * 
            * 這種狀況下看似因爲沒有更新pred的next的操做,隊列中可能會留有一大把的取消節點。
            * 實際上沒關係,由於後繼線程喚醒以後會走一次試獲取鎖的過程,
            * 失敗的話會走到shouldParkAfterFailedAcquire的邏輯。
            * 那裏面的if中有處理前驅節點若是爲取消則維護pred/next,踢掉這些取消節點的邏輯。
            */
           unparkSuccessor(node);
       }
       
       /*
        * 取消節點的next之因此設置爲本身自己而不是null,
        * 是爲了方便AQS中Condition部分的isOnSyncQueue方法,
        * 判斷一個原先屬於條件隊列的節點是否轉移到了同步隊列。
        *
        * 由於同步隊列中會用到節點的next域,取消節點的next也有值的話,
        * 能夠斷言next域有值的節點必定在同步隊列上。
        *
        * 在GC層面,和設置爲null具備相同的效果。
        */
       node.next = node; 
   }
}

/**
 * 喚醒後繼線程。
 */
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    // 嘗試將node的等待狀態置爲0,這樣的話,後繼爭用線程能夠有機會再嘗試獲取一次鎖。
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;
    /*
     * 這裏的邏輯就是若是node.next存在而且狀態不爲取消,則直接喚醒s便可
     * 不然須要從tail開始向前找到node以後最近的非取消節點。
     *
     * 這裏爲何要從tail開始向前查找也是值得琢磨的:
     * 若是讀到s == null,不表明node就爲tail,參考addWaiter以及enq函數中的個人註釋。
     * 不妨考慮到以下場景:
     * 1. node某時刻爲tail
     * 2. 有新線程經過addWaiter中的if分支或者enq方法添加本身
     * 3. compareAndSetTail成功
     * 4. 此時這裏的Node s = node.next讀出來s == null,但事實上node已經不是tail,它有後繼了!
     */
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

AQS獨佔鎖的獲取的流程示意以下:

4.3 釋放獨佔鎖的實現

上面已經分析了acquire的實現,下面來看看release的實現:
對於釋放一個獨佔鎖,首先會調用tryRelease,在徹底釋放掉獨佔鎖後,這時後繼線程是能夠獲取到獨佔鎖的,
所以釋放者線程須要作的事情是喚醒一個隊列中的後繼者線程,讓它去嘗試獲取獨佔鎖。

上述所謂徹底釋放掉鎖的含義,簡單來講就是當前鎖處於無主狀態,等待線程有可能能夠獲取。
舉例:對於可重入鎖ReentrantLock, 每次tryAcquire後,state會+1,每次tryRelease後,state會-1,若是state變爲0了,則此時稱獨佔鎖被徹底釋放了。

下面,咱們來看一下release的具體代碼實現:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        /*
         * 此時的head節點可能有3種狀況:
         * 1. null (AQS的head延遲初始化+無競爭的狀況)
         * 2. 當前線程在獲取鎖時new出來的節點經過setHead設置的
         * 3. 因爲經過tryRelease已經徹底釋放掉了獨佔鎖,有新的節點在acquireQueued中獲取到了獨佔鎖,並設置了head

         * 第三種狀況能夠再分爲兩種狀況:
         * (一)時刻1:線程A經過acquireQueued,持鎖成功,set了head
         *          時刻2:線程B經過tryAcquire試圖獲取獨佔鎖失敗失敗,進入acquiredQueued
         *          時刻3:線程A經過tryRelease釋放了獨佔鎖
         *          時刻4:線程B經過acquireQueued中的tryAcquire獲取到了獨佔鎖並調用setHead
         *          時刻5:線程A讀到了此時的head其實是線程B對應的node
         * (二)時刻1:線程A經過tryAcquire直接持鎖成功,head爲null
         *          時刻2:線程B經過tryAcquire試圖獲取獨佔鎖失敗失敗,入隊過程當中初始化了head,進入acquiredQueued
         *          時刻3:線程A經過tryRelease釋放了獨佔鎖,此時線程B還未開始tryAcquire
         *          時刻4:線程A讀到了此時的head其實是線程B初始化出來的傀儡head
         */
        Node h = head;
        // head節點狀態不會是CANCELLED,因此這裏h.waitStatus != 0至關於h.waitStatus < 0
        if (h != null && h.waitStatus != 0)
            // 喚醒後繼線程,此函數在acquire中已經分析過,再也不列舉說明
            unparkSuccessor(h);
        return true;
    }
    return false;
}

整個release作的事情就是

  1. 調用tryRelease
  2. 若是tryRelease返回true也就是獨佔鎖被徹底釋放,喚醒後繼線程。

這裏的喚醒是根據head幾點來判斷的,上面代碼的註釋中也分析了head節點的狀況,只有在head存在而且等待狀態小於零的狀況下喚醒。

4.4 獲取共享鎖的實現

與獲取獨佔鎖的實現不一樣的關鍵在於,共享鎖容許多個線程持有。
若是須要使用AQS中共享鎖,在實現tryAcquireShared方法時須要注意,返回負數表示獲取失敗;返回0表示成功,可是後繼爭用線程不會成功;返回正數表示
獲取成功,而且後繼爭用線程也可能成功。

下面來看一下具體的代碼實現:

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                // 一旦共享獲取成功,設置新的頭結點,而且喚醒後繼線程
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

/**
 * 這個函數作的事情有兩件:
 * 1. 在獲取共享鎖成功後,設置head節點
 * 2. 根據調用tryAcquireShared返回的狀態以及節點自己的等待狀態來判斷是否要須要喚醒後繼線程。
 */
private void setHeadAndPropagate(Node node, int propagate) {
    // 把當前的head封閉在方法棧上,用如下面的條件檢查。
    Node h = head;
    setHead(node);
    /*
     * propagate是tryAcquireShared的返回值,這是決定是否傳播喚醒的依據之一。
     * h.waitStatus爲SIGNAL或者PROPAGATE時也根據node的下一個節點共享來決定是否傳播喚醒,
     * 這裏爲何不能只用propagate > 0來決定是否能夠傳播在本文下面的思考問題中有相關講述。
     */
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

/**
 * 這是共享鎖中的核心喚醒函數,主要作的事情就是喚醒下一個線程或者設置傳播狀態。
 * 後繼線程被喚醒後,會嘗試獲取共享鎖,若是成功以後,則又會調用setHeadAndPropagate,將喚醒傳播下去。
 * 這個函數的做用是保障在acquire和release存在競爭的狀況下,保證隊列中處於等待狀態的節點可以有辦法被喚醒。
 */
private void doReleaseShared() {
    /*
     * 如下的循環作的事情就是,在隊列存在後繼線程的狀況下,喚醒後繼線程;
     * 或者因爲多線程同時釋放共享鎖因爲處在中間過程,讀到head節點等待狀態爲0的狀況下,
     * 雖然不能unparkSuccessor,但爲了保證喚醒可以正確穩固傳遞下去,設置節點狀態爲PROPAGATE。
     * 這樣的話獲取鎖的線程在執行setHeadAndPropagate時能夠讀到PROPAGATE,從而由獲取鎖的線程去釋放後繼等待線程。
     */
    for (;;) {
        Node h = head;
        // 若是隊列中存在後繼線程。
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;
                unparkSuccessor(h);
            }
            // 若是h節點的狀態爲0,須要設置爲PROPAGATE用以保證喚醒的傳播。
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;
        }
        // 檢查h是否仍然是head,若是不是的話須要再進行循環。
        if (h == head)
            break;
    }
}

4.5 釋放共享鎖的實現

釋放共享鎖與獲取共享鎖的代碼共享了doReleaseShared,用於實現喚醒的傳播。

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        // doReleaseShared的實現上面獲取共享鎖已經介紹
        doReleaseShared();
        return true;
    }
    return false;
}

從中,咱們能夠看出,共享鎖的獲取和釋放都會涉及到doReleaseShared,也就是後繼線程的喚醒。關於PROPAGATE狀態的必要性,後文會做進一步介紹。

5. 一些思考

AQS的代碼實在是很精妙,要看懂大體套路並不困難,可是要徹底領悟其中的一些細節是一件須要花功夫來仔細琢磨品味的事情。

下面列出一些看源碼時的問題與思考:

5.1 插入節點時的代碼順序

addWaiter和enq方法中新增一個節點時爲何要先將新節點的prev置爲tail再嘗試CAS,而不是CAS成功後來構造節點之間的雙向連接?
這是由於,雙向鏈表目前沒有基於CAS原子插入的手段,若是咱們將node.prev = tt.next = node(t爲方法執行時讀到的tail,引用封閉在棧上)
放到compareAndSetTail(t, node)成功後執行,以下所示:

if (compareAndSetTail(t, node)) {
   node.prev = t;
   t.next = node;
   return t;
}

會致使這一瞬間的tail也就是t的prev爲null,這就使得這一瞬間隊列處於一種不一致的中間狀態。

5.2 喚醒節點時爲何從tail向前遍歷

unparkSuccessor方法中爲何喚醒後繼節點時要從tail向前查找最接近node的非取消節點,而不是直接從node向後找到第一個後break掉?

在上面的代碼註釋中已經說起到這一點:
若是讀到s == null,不表明node就爲tail。
考慮以下場景:

  1. node某時刻爲tail
  2. 有新線程經過addWaiter中的if分支或者enq方法添加本身
  3. compareAndSetTail成功
  4. 此時這裏的Node s = node.next讀出來s == null,但事實上node已經不是tail,它有後繼了!

5.3 unparkSuccessor有新線程爭鎖是否存在漏洞

unparkSuccessor方法在被release調用時是否存在這樣的一個漏洞?
時刻1: node -> tail && tail.waitStatus == Node.CANCELLED (node的下一個節點爲tail,而且tail處於取消狀態)
時刻2: unparkSuccessor讀到s.waitStatus > 0
時刻3: unparkSuccessor從tail開始遍歷
時刻4: tail節點對應線程執行cancelAcquire方法中的if (node == tail && compareAndSetTail(node, pred)) 返回true,
此時tail變爲pred(也就是node)
時刻5: 有新線程進隊列tail變爲新節點
時刻6: unparkSuccessor沒有發現須要喚醒的節點
最終新節點阻塞而且前驅節點結束調用,新節點再也沒法被unpark
這種狀況不會發生,確實可能出現從tail向前掃描,沒有讀到新入隊的節點,但別忘了acquireQueued的思想就是不斷循環檢測是否可以獨佔獲取鎖,
不然再進行判斷是否要阻塞本身,而release的第一步就是tryRelease,它的語義爲true表示徹底釋放獨佔鎖,徹底釋放以後纔會執行後面的邏輯,也就是unpark後繼線程。在這種狀況下,新入隊的線程應當能獲取到鎖。
若是沒有獲取鎖,則必然是在覆蓋tryAcquire/tryRelease的實現有問題,致使前驅節點成功釋放了獨佔鎖,後繼節點獲取獨佔鎖仍然失敗。也就是說AQS框架的可靠性還在
某些程度上依賴於具體子類的實現,子類實現若是有bug,那AQS再精巧也扛不住。

5.4 AQS如何保證隊列活躍

AQS如何保證在節點釋放的同時又有新節點入隊的狀況下,不出現原持鎖線程釋放鎖,後繼線程被本身阻塞死的狀況,保持同步隊列的活躍?
回答這個問題,須要理解shouldParkAfterFailedAcquire和unparkSuccessor這兩個方法。
以獨佔鎖爲例,後繼爭用線程阻塞本身的狀況是讀到前驅節點的等待狀態爲SIGNAL,只要不是這種狀況都會再試着去爭取鎖。
假設後繼線程讀到了前驅狀態爲SIGNAL,說明以前在tryAcquire的時候,前驅持鎖線程尚未tryRelease徹底釋放掉獨佔鎖。
此時若是前驅線程徹底釋放掉了獨佔鎖,則在unparkSuccessor中還沒執行完置waitStatus爲0的操做,也就是還沒執行到下面喚醒後繼線程的代碼,不然後繼線程會再去爭取鎖。
那麼就算後繼爭用線程此時把本身阻塞了,也必定會立刻被前驅線程喚醒。
那麼是否可能持鎖線程執行喚醒後繼線程的邏輯時,後繼線程讀到前驅等待狀態爲SIGNAL把本身給阻塞,再也沒法甦醒呢?
這個問題在上面的問題3中已經有答案了,確實可能在掃描後繼須要喚醒線程時讀不到新來的線程,但只要tryRelease語義實現正確,在true時表示徹底釋放獨佔鎖,
則後繼線程理應可以tryAcquire成功,shouldParkAfterFailedAcquire在讀到前驅狀態不爲SIGNAL會給當前線程再一次獲取鎖的機會的。
別看AQS代碼寫的有些複雜,狀態有些多,還真的就是沒毛病,各類狀況都能覆蓋。

5.5 PROPAGATE狀態存在的意義

在setHeadAndPropagate中咱們能夠看到以下的一段代碼:

if (propagate > 0 || h == null || h.waitStatus < 0 ||
       (h = head) == null || h.waitStatus < 0) {
       Node s = node.next;
       if (s == null || s.isShared())
           doReleaseShared();
}

爲何不僅是用propagate > 0來判斷呢?咱們知道目前AQS代碼中的Node.PROPAGATE狀態就是爲了此處能夠讀取到h.waitStatus < 0(PROPAGATE值爲-3);若是這裏能夠只用propagate > 0來判斷,是否PROPAGATE狀態都沒有存在的必要了?

我接觸JAVA比較晚,接觸的時候就已是JDK8的年代了。這個問題我思考了好久,沒有想到很合理的解釋來講明PROPAGATE狀態存在的必要性。
在網上也鮮少有相關方面的資料、博客說起到這些。後來經過瀏覽Doug Lea的我的網站,發如今好久之前AQS的代碼確實是沒有PROPAGATE的,PROPAGATE的引入是爲了解決共享鎖併發釋放致使的線程hang住問題。

在Doug Lea的JSR 166 repository上,我找到了PROPAGATE最先被引入的那一版。能夠看到
Revision1.73中,PROPAGATE狀態被引入用以修復bug 6801020,讓咱們來看看這個bug:

import java.util.concurrent.Semaphore;

public class TestSemaphore {

   private static Semaphore sem = new Semaphore(0);

   private static class Thread1 extends Thread {
       @Override
       public void run() {
           sem.acquireUninterruptibly();
       }
   }

   private static class Thread2 extends Thread {
       @Override
       public void run() {
           sem.release();
       }
   }

   public static void main(String[] args) throws InterruptedException {
       for (int i = 0; i < 10000000; i++) {
           Thread t1 = new Thread1();
           Thread t2 = new Thread1();
           Thread t3 = new Thread2();
           Thread t4 = new Thread2();
           t1.start();
           t2.start();
           t3.start();
           t4.start();
           t1.join();
           t2.join();
           t3.join();
           t4.join();
           System.out.println(i);
       }
   }
}

很顯然,這段程序必定能執行結束的,可是會偶現線程hang住的問題。
當時的AQS中setHeadAndPropagate是這樣的:

以上是bug 6801020修復點的對比,左邊爲修復以前的版本,右邊爲引入PROPAGATE修復以後的版本。

從左邊能夠看到原先的setHeadAndPropagate相比目前版本要簡單不少,而releaseShared的實現也與release基本雷同,這也正是本問題的核心:爲何僅僅用調用的tryAcquireShared
獲得的返回值來判斷是否須要喚醒不行呢?

在PROPAGATE狀態出現以前的源碼能夠
在這裏查看

5.5.1 分析

讓咱們來分析一下上面的程序:

上面的程序循環中作的事情就是放出4個線程,其中2個線程用於獲取信號量,另外2個用於釋放信號量。每次循環主線程會等待全部子線程執行完畢。
出現bug也就是線程hang住的問題就在於兩個獲取信號量的線程有一個會沒辦法被喚醒,隊列就死掉了。

在AQS的共享鎖中,一個被park的線程,不考慮線程中斷和前驅節點取消的狀況,有兩種狀況能夠被unpark:一種是其餘線程釋放信號量,調用unparkSuccessor;
另外一種是其餘線程獲取共享鎖時經過傳播機制來喚醒後繼節點。

咱們假設某次循環中隊列裏排隊的節點爲狀況爲:
head -> t1的node -> t2的node(也就是tail)

信號量釋放的順序爲t3先釋放,t4後釋放:
時刻1: t3調用releaseShared,調用了unparkSuccessor(h),head的等待狀態從-1變爲0
時刻2: t1因爲t3釋放了信號量,被t3喚醒,調用Semaphore.NonfairSync的tryAcquireShared,返回值爲0
時刻3: t4調用releaseShared,讀到此時h.waitStatus爲0(此時讀到的head和時刻1中爲同一個head),不知足條件,所以不會調用unparkSuccessor(h)
時刻4: t1獲取信號量成功,調用setHeadAndPropagate時,由於不知足propagate > 0(時刻2的返回值也就是propagate==0),從而不會喚醒後繼節點

這就比如是一個精巧的多米諾骨牌最終因爲設計的失誤致使動力沒法傳遞下去,至此AQS中的同步隊列宣告死亡。

那麼引入PROPAGATE是怎麼解決問題的呢?
引入以後,調用releaseShared方法再也不簡單粗暴地直接unparkSuccessor,而是將傳播行爲抽了一個doReleaseShared方法出來。
再看上面的那種狀況:
時刻1:t3調用releaseShared -> doReleaseShared -> unparkSuccessor,完了以後head的等待狀態爲0
時刻2:t1因爲t3釋放了信號量,被t3喚醒,調用Semaphore.NonfairSync的tryAcquireShared,返回值爲0
時刻3:t4調用releaseShared,讀到此時h.waitStatus爲0(此時讀到的head和時刻1中爲同一個head),將等待狀態置爲PROPAGATE
時刻4:t1獲取信號量成功,調用setHeadAndPropagate時,能夠讀到h.waitStatus < 0,從而能夠接下來調用doReleaseShared喚醒t2

也就是說,上面會產生線程hang住bug的case在引入PROPAGATE後能夠被規避掉。在PROPAGATE引入以前,之因此可能會出現線程hang住的狀況,就是在於
releaseShared有競爭的狀況下,可能會有隊列中處於等待狀態的節點由於第一個線程完成釋放喚醒,第二個線程獲取到鎖,但還沒設置好head,又有新線程釋放鎖,可是讀到老的head狀態爲0致使釋放但不喚醒,最終後一個等待線程既沒有被釋放線程喚醒,也沒有被持鎖線程喚醒。

因此,僅僅靠tryAcquireShared的返回值來決定是否要將喚醒傳遞下去是不充分的。

5.6 AQS如何防止內存泄露

AQS維護了一個FIFO隊列,它是如何保證在運行期間不發生內存泄露的?
AQS在無競爭條件下,甚至都不會new出head和tail節點。
線程成功獲取鎖時設置head節點的方法爲setHead,因爲頭節點的thread並不重要,此時會置node的thread和prev爲null,
完了以後還會置原先head也就是線程對應node的前驅的next爲null,從而實現隊首元素的安全移出。
而在取消節點時,也會令node.thread = null,在node不爲tail的狀況下,會使node.next = node(之因此這樣也是爲了isOnSyncQueue實現更加簡潔)

6. 總結

AQS毫無疑問是Doug Lea大師使人歎爲觀止的做品,它實現精巧、魯棒、優雅,很好地封裝了同步狀態的管理、線程的等待與喚醒,足以知足大多數同步工具的需求。
閱讀AQS的源碼不是一蹴而就就能徹底讀懂的,閱讀源碼大體分爲三步:

  1. 讀懂大概思路以及一些重要方法之間的調用關係
  2. 逐行看代碼的具體實現,知道每一段代碼是幹什麼的
  3. 琢磨參悟某一段代碼爲何是這麼寫的,可否換一種寫法,可否先後幾行代碼調換順序,做者是怎麼想的

從Doug Lea大師的論文中,咱們也可以看出他設計並實現了AQS自己一方面是本人功力深厚,另外一方面也閱讀了大量的文獻與資料,也作了不少方面的測試。
讀AQS最難的地方不在於明白套路和思路,而在於代碼中點點滴滴的細節。從一行行的代碼角度來講,好比改一個值,是否須要CAS,是否必定要CAS成功;讀一個值,在多線程環境下含義是什麼,有哪些種狀況。從一個個方法角度來講,這些方法的調用關係是如何保證框架的正確性、魯棒性、伸縮性等。
若是能把這些細節都想清楚,明白做者的思路與考慮,才能夠源碼理解入木三分了。

對於PROPAGATE狀態,網上大多AQS的介紹也都只是淺顯地說起是用來設置傳播的,缺乏對於這個狀態存在必要性的思考。一開始我也想了好久不明白爲何必定須要一個PROPAGATE狀態而不是直接根據tryAcquireShared的返回值來判斷是否須要傳播。後來也是去了Doug Lea的我的網站翻出當時最先引入PROPAGATE狀態的提交,看到了原來的代碼,以及http://bugs.java.com/上的bug才更釐清PROPAGATE狀態引入的來龍去脈。

儘管看懂源碼,也可能遠遠達不到能再造一個能與之媲美的輪子的程度,可是能對同步框架、鎖、線程等有更深刻的理解,也是很豐碩的收穫了。
固然,AQS也有其侷限性,因爲維護的是FIFO隊列。若是想要實現一個具備優先級的鎖,AQS就派不上什麼用處了。

7. 參考

Doug Lea的AQS論文 The Art of Multiprocessor Programming(多處理器編程的藝術)

相關文章
相關標籤/搜索