Java併發編程之鎖機制之AQS

該文章屬於《Java併發編程》系列文章,若是想了解更多,請點擊《Java併發編程之總目錄》node

前言

在上篇文章《Java併發編程之鎖機制之Lock接口》中,咱們已經瞭解了,Java下整個Lock接口下實現的鎖機制是經過AQS(這裏咱們將AbstractQueuedSynchronizer 或AbstractQueuedLongSynchronizer統稱爲AQS)與Condition來實現的。那下面咱們就來具體瞭解AQS的內部細節與實現原理。編程

PS:該篇文章會以AbstractQueuedSynchronizer來進行講解,對AbstractQueuedLongSynchronizer有興趣的小夥伴,能夠自行查看相關資料。安全

AQS簡介

抽象隊列同步器AbstractQueuedSynchronizer (如下都簡稱AQS),是用來構建鎖或者其餘同步組件的基礎框架,它使用了一個int成員變量來表示同步狀態,經過內置的FIFO(first-in-first-out)同步隊列來控制獲取共享資源的線程。bash

該類被設計爲大多數同步組件的基類,這些同步組件都依賴於單個原子值(int)來控制同步狀態,子類必需要定義獲取獲取同步與釋放狀態的方法,在AQS中提供了三種方法getState()setState(int newState)compareAndSetState(int expect, int update)來進行操做。同時子類應該爲自定義同步組件的靜態內部類,AQS自身沒有實現任何同步接口,它僅僅是定義了若干同步狀態獲取和釋放的方法來供自定義同步組件使用,同步器既能夠支持獨佔式地獲取同步狀態,也能夠支持共享式地獲取同步狀態,這樣就能夠方便實現不一樣類型的同步組件(ReentrantLock、ReentrantReadWriteLock和CountDownLatch等)。多線程

AQS類方法簡介

AQS的設計是基於模板方法模式的,也就是說,使用者須要繼承同步器並重寫指定的方法,隨後將同步器組合在自定義同步組件的實現中,並調用同步器提供的模板方法,而這些模板方法將會調用使用者重寫的方法。併發

修改同步狀態方法

在子類實現自定義同步組件的時候,須要經過AQS提供的如下三個方法,來獲取與釋放同步狀態。框架

  • int getState() :獲取當前同步狀態
  • void setState(int newState) :設置當前同步狀態
  • boolean compareAndSetState(int expect, int update) 使用CAS設置當前狀態。

子類中能夠重寫的方法

  • boolean isHeldExclusively():當前線程是否獨佔鎖
  • boolean tryAcquire(int arg):獨佔式嘗試獲取同步狀態,經過CAS操做設置同步狀態,若是成功返回true,反之返回false
  • boolean tryRelease(int arg):獨佔式釋放同步狀態。
  • int tryAcquireShared(int arg):共享式的獲取同步狀態,返回大於等於0的值,表示獲取成功,反之失敗。
  • boolean tryReleaseShared(int arg):共享式釋放同步狀態。

獲取同步狀態與釋放同步狀態方法

當咱們實現自定義同步組件時,將會調用AQS對外提供的方法同步狀態與釋放的方法,固然這些方法內部會調用其子類的模板方法。這裏將對外提供的方法分爲了兩類,具體以下所示:函數

  • 獨佔式獲取與釋放同步狀態
  1. void acquire(int arg):獨佔式獲取同步狀態,若是當前線程獲取同步狀態成功,則返回,不然進入同步隊列等待,該方法會調用tryAcquire(int arg)方法。
  2. void acquireInterruptibly(int arg):與 void acquire(int arg)基本邏輯相同,可是該方法響應中斷,若是當前沒有獲取到同步狀態,那麼就會進入等待隊列,若是當前線程被中斷(Thread().interrupt()),那麼該方法將會拋出InterruptedException。並返回
  3. boolean tryAcquireNanos(int arg, long nanosTimeout):在acquireInterruptibly(int arg)的基礎上,增長了超時限制,若是當前線程沒有獲取到同步狀態,那麼將返回fase,反之返回true。
  4. boolean release(int arg) :獨佔式的釋放同步狀態
  • 共享式獲取與釋放同步狀態
  1. void acquireShared(int arg):共享式的獲取同步狀態,若是當前線程未獲取到同步狀態,將會進入同步隊列等待,與獨佔式獲取的主要區別是在同一時刻能夠有多個線程獲取到同步狀態。
  2. void acquireSharedInterruptibly(int arg):在acquireShared(int arg)的基本邏輯相同,增長了響應中斷。
  3. boolean tryAcquireSharedNanos(int arg, long nanosTimeout):在acquireSharedInterruptibly的基礎上,增長了超時限制。
  4. boolean releaseShared(int arg) :共享式的釋放同步狀態

AQS具體實現及內部原理

在瞭解了AQS中的針對不一樣方式獲取與釋放同步狀態(獨佔式與共享式)與修改同步狀態的方法後,如今咱們來了解AQS中具體的實現及其內部原理。工具

AQS中FIFO隊列

在上文中咱們提到AQS中主要經過一個FIFO(first-in-first-out)來控制線程的同步。那麼在實際程序中,AQS會將獲取同步狀態的線程構形成一個Node節點,並將該節點加入到隊列中。若是該線程獲取同步狀態失敗會阻塞該線程,當同步狀態釋放時,會把頭節點中的線程喚醒,使其嘗試獲取同步狀態。oop

Node節點結構

下面咱們就經過實際代碼來了解Node節點中存儲的信息。Node節點具體實現以下:

static final class Node {
        volatile int waitStatus;
        volatile Node prev;
        volatile Node next;
        volatile Thread thread;
        Node nextWaiter;
    }
複製代碼

Node節點是AQS中的靜態內部類,下面分別對其中的屬性(注意其屬性都用volatile 關鍵字進行修飾)進行介紹。

  • int waitStatus:等待狀態主要包含如下狀態
  1. SIGNAL = -1:當前節點的線程若是釋放了或取消了同步狀態,將會將當前節點的狀態標誌位SINGAL,用於通知當前節點的下一節點,準備獲取同步狀態。
  2. CANCELLED = 1:被中斷或獲取同步狀態超時的線程將會被置爲當前狀態,且該狀態下的線程不會再阻塞。
  3. CONDITION = -2:當前節點在Condition中的等待隊列上,(關於Condition會在下篇文章進行介紹),其餘線程調用了Condition的singal()方法後,該節點會從等待隊列轉移到AQS的同步隊列中,等待獲取同步鎖。
  4. PROPAGATE = -3:與共享式獲取同步狀態有關,該狀態標識的節點對應線程處於可運行的狀態。
  5. 0:初始化狀態。
  • Node prev:當前節點在同步隊列中的上一個節點。
  • Node next:當前節點在同步隊列中的下一個節點。
  • Thread thread:當前轉換爲Node節點的線程。
  • Node nextWaiter:當前節點在Condition中等待隊列上的下一個節點,(關於Condition會在下篇文章進行介紹)。

AQS同步隊列具體實現結構

經過上文的描述咱們大概瞭解了Node節點中存儲的數據與信息,如今咱們來看看整個AQS下同步隊列的結構。具體以下圖所示:

aqs.png
在AQS中的同步隊列中,分別有兩個指針(你也能夠叫作對象的引用),一個 head指針指向隊列中的頭節點,一個 tail指針指向隊列中的尾節點。

AQS添加尾節點

當一個線程成功獲取了同步狀態(或者鎖),其餘線程沒法獲取到同步狀態,這個時候會將該線程構形成Node節點,並加入到同步隊列中,而這個加入隊列的過程必需要確保線程安全,因此在AQS中提供了一個基於CAS的設置尾節點的方法:compareAndSetTail(Node expect,Nodeupdate),它須要傳遞當前線程「認爲」的尾節點和當前節點,只有設置成功後,當前節點才正式與以前的尾節點創建關聯。具體過程以下圖所示:

aqs_save_tail.png
上圖中,虛線部分爲以前tail指向的節點。

AQS添加頭節點

在AQS中的同步隊列中,頭節點是獲取同步狀態成功的節點,頭節點的線程會在釋放同步狀態時,將會喚醒其下一個節點,而下一個節點會在獲取同步狀態成功時將本身設置爲頭節點,具體過程以下圖所示:

aqs_save_head.png

上圖中,虛線部分爲以前head指向的節點。由於設置頭節點是獲取同步狀態成功的線程來完成的,因爲只有一個線程可以成功獲取到同步狀態,所以設置頭節點的方法並不須要CAS來進行保證,只須要將原頭節點的next指向斷開就好了。

如今咱們已經瞭解了AQS中同步隊列的頭節點與尾節點的設置過程。如今咱們根據實際代碼進行分析,由於涉及到不一樣狀態對同步狀態的獲取(獨佔式與共享式),因此下面會分別對這兩種狀態進行講解。

獨佔式同步狀態獲取與釋放

獨佔式同步狀態獲取

經過acquire(int arg)方法咱們能夠獲取到同步狀態,可是須要注意的是該方法並不會響應線程的中斷與獲取同步狀態的超時機制。同時即便當前線程已經中斷了,經過該方法放入的同步隊列的Node節點(該線程構造的Node),也不會從同步隊列中移除。具體代碼以下所示:

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
複製代碼

在該方法中,主要經過子類重寫的方法tryAcquire(arg)來獲取同步狀態,若是獲取同步狀態失敗,則會將請求線程構造獨佔式Node節點(Node.EXCLUSIVE),同時將該線程加入同步隊列的尾部(由於AQS中的隊列是FIFO類型)。接着咱們查看addWaiter(Node mode)方法具體細節:

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);//將該線程構形成Node節點
  
        Node pred = tail;
        if (pred != null) {//嘗試將尾指針 tail 指向當前線程構造的Node節點
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
              //若是成功,那麼將尾指針以前指向的節點的next指向 當前線程構造的Node節點
                pred.next = node;
                return node;
            }
        }
        enq(node);//若是當前尾指針爲null,則調用enq(final Node node)方法
        return node;
    }
複製代碼

在該方法中,主要分爲兩個步驟:

  • 若是當前尾指針(tail)不爲null,那麼嘗試將尾指針 tail 指向當前線程構造的Node節點,若是成功,那麼將尾指針以前指向的節點的next指向當前線程構造的Node節點,並返回當前節點。
  • 反之調用enq(final Node node)方法,將當前線程構造的節點加入同步隊列中。

接下來咱們繼續查看enq(final Node node)方法。

private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) {//若是當前尾指針爲null,那麼嘗試將頭指針 head指向當前線程構造的Node節點
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {//若是當前尾指針(tail)不爲null,那麼嘗試將尾指針 tail 指向當前線程構造的Node節點
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

複製代碼

在enq(final Node node)方法中,經過死循環(你也能夠叫作自旋)的方式來保證節點的正確的添加。接下來,咱們繼續查看acquireQueued(final Node node, int arg)方法的處理。該方法纔是整個多線程競爭同步狀態的關鍵,你們必定要注意看!!!

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();//獲取該節點的上一節點
                //若是上一節點是head鎖指向的節點,且該節點獲取同步狀態成功
                if (p == head && tryAcquire(arg)) {
		            //設置head指向該節點,
                    setHead(node);
                    p.next = null; // 將上一節點的next指向斷開
                    failed = false;
                    return interrupted;
                }
                //判斷獲取同步狀態失敗的線程是否須要阻塞
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())//阻塞並判斷當前線程是否已經中斷了
                    interrupted = true;
            }
        } finally {
            if (failed)
            //若是線程中斷了,那麼就將該線程從同步隊列中移除,同時喚醒下一節點
                cancelAcquire(node);
        }
    }
複製代碼

在該方法中主要分爲三個步驟:

  • 經過死循環(你也能夠叫作自旋)的方式來獲取同步狀態,若是當前節點的上一節點是head指向的節點該節點獲取同步狀態成功,那麼會設置head指向該節點 ,同時將上一節點的next指向斷開。
    aqs_self_rotate.png

aqs_get_head.png

  • 若是當前節點的上一節點不是head指向的節點,或者獲取當前節點同步狀態失敗,那麼會先調用shouldParkAfterFailedAcquire(Node pred, Node node)方法來判斷是須要否阻塞當前線程,若是該方法返回true,則調用parkAndCheckInterrupt()方法來阻塞線程。若是該方法返回false,那麼該方法內部會把當前節點的上一節點的狀態修改成Node.SINGAL。
  • 在finally語句塊中,判斷當前線程是否已經中斷。若是中斷,則經過那麼cancelAcquire(Node node)方法將該線程(對應的Node節點)從同步隊列中移除,同時喚醒下一節點。

下面咱們接着來看shouldParkAfterFailedAcquire(Node pred, Node node)方法,看看具體的阻塞具體邏輯,代碼以下所示:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
	        //上一節點已經設置狀態請求釋放信號,所以當前節點能夠安全地阻塞
            return true;
        if (ws > 0) {
	        //上一節點,已經被中斷或者超時,那麼接跳過全部狀態爲Node.CANCELLED
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
	        //其餘狀態,則調用cas操做設置狀態爲Node.SINGAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
複製代碼

在該方法中會獲取上一節點的狀態(waitStatus),而後進行下面的三個步驟的判斷。

  • 若是上一節點狀態爲Node.SIGNAL,那麼會阻塞接下來的線程(函數 return true)
  • 若是上一節點的狀態大於0(從上文描述的waitStatus全部狀態中,咱們能夠得知只有Node.CANCELLED大於0)那麼會跳過整個同步列表中全部狀態爲Node.CANCELLED的Node節點。(函數 return false)
  • 若是上一節點是其餘狀態,則調用CAS操做設置其狀態爲Node.SINGAL。(函數 return false)
阻塞實現

shouldParkAfterFailedAcquire(Node pred, Node node)方法返回true時,接着會調用parkAndCheckInterrupt()方法來阻塞當前線程。該方法的返回值爲當前線程是否中斷。

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
複製代碼

在該方法中,主要阻塞線程的方法是經過LockSupport(在後面的文章中會具體介紹)的park來阻塞當前線程。

從同步隊列中移除,同時喚醒下一節點

經過對獨佔式獲取同步狀態的理解,咱們知道 acquireQueued(final Node node, int arg)方法中最終會執行finally語句塊中的代碼,來判斷當前線程是否已經中斷。若是中斷,則經過那麼cancelAcquire(Node node)方法將該線程從同步隊列中移除。那麼接下來咱們來看看該方法的具體實現。具體代碼以下:

private void cancelAcquire(Node node) {
        //若是當前節點已經不存在直接返回
        if (node == null)
            return;
		//(1)將該節點對應的線程置爲null
        node.thread = null;

        //(2)跳過當前節點以前已經取消的節點
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

		//獲取在(2)操做以後,節點的下一個節點
        Node predNext = pred.next;

	    //(3)將當前中斷的線程對應節點狀態設置爲CANCELLED
        node.waitStatus = Node.CANCELLED;

        //(4)若是當前中斷的節點是尾節點,那麼則將尾節點從新指向
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            //(5)若是中斷的節點的上一個節點的狀態,爲SINGAL或者即將爲SINGAL,
            //那麼將該當前中斷節點移除
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                unparkSuccessor(node);//(6)將該節點移除,同時喚醒下一個節點
            }

            node.next = node; // help GC
        }
    }
複製代碼

觀察上訴代碼,咱們能夠知道該方法幹了如下這幾件事

  • (1)將中斷線程對應的節點對應的線程置爲null

  • (2)跳過當前節點以前已經取消的節點(咱們已經知道在Node.waitStatus的枚舉中,只有CANCELLED 大於0 )

    跳過已經取消的節點.png

  • (3)將當前中斷的線程對應節點狀態設置爲CANCELLED

  • (4)在(2)的前提下,若是當前中斷的節點是尾節點,那麼經過CAS操做將尾節點指向(2)操做後的的節點。

從新設置尾節點Tail.png

  • (5)若是當前中斷節點不是尾節點,且當前中斷的節點的上一個節點的狀態,爲SINGAL或者即將爲SINGAL,那麼將該當前中斷節點移除。
  • (6)若是(5)條件不知足,那麼調用unparkSuccessor(Node node)方法將該節點移除,同時喚醒下一個節點。具體代碼以下:
private void unparkSuccessor(Node node) {
         //重置該節點爲初始狀態
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        //獲取中斷節點的下一節點    
        Node s = node.next;
        //判斷下一節點的狀態,若是爲Node.CANCELED狀態
        if (s == null || s.waitStatus > 0) {
            s = null;
            //則經過尾節點向前遍歷,獲取最近的waitStatus<=0的節點
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        //若是該節點不會null,則喚醒該節點中的線程。
        if (s != null)
            LockSupport.unpark(s.thread);
    }
複製代碼

這裏爲了方便你們理解,我仍是將圖補充了出來,(圖片有可能不是很清晰,建議你們點擊瀏覽大圖),

aqs.png
總體來講,unparkSuccessor(Node node)方法主要是獲取中斷節點後的可用節點(Node.waitStatus<=0),而後將該節點對應的線程喚醒。

獨佔式同步狀態釋放

當線程獲取同步狀態成功並執行相應邏輯後,須要釋放同步狀態,使得後繼線程節點可以繼續獲取同步狀態,經過調用AQS的relase(int arg)方法,能夠釋放同步狀態。具體代碼以下:

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
複製代碼

在該方法中,會調用模板方法tryRelease(int arg),也就是說同步狀態的釋放邏輯,是須要用戶來本身定義的。當tryRelease(int arg)方法返回true後,若是當前頭節點不爲null且頭節點waitStatus!=0,接着會調用unparkSuccessor(Node node)方法來喚醒下一節點(使其嘗試獲取同步狀態)。關於unparkSuccessor(Node node)方法,上文已經分析過了,這裏就再也不進行描述了。

共享式同步狀態獲取與釋放

共享式獲取與獨佔式獲取最主要的區別在於同一時刻是否能有多個線程同時獲取到同步狀態。以文件的讀寫爲例,若是一個程序在對文件進行讀操做,那麼這一時刻對於文件的寫操做均會被阻塞。而其餘讀操做可以同時進行。若是對文件進行寫操做,那麼這一時刻其餘的讀寫操做都會被阻塞,寫操做要求對資源的獨佔式訪問,而讀操做能夠是共享訪問的。

共享式同步狀態獲取

在瞭解了共享式同步狀態獲取與獨佔式獲取同步狀態的區別後,如今咱們來看一看共享式獲取的相關方法。在AQS中經過 acquireShared(int arg)方法來實現的。具體代碼以下:

public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
複製代碼

在該方法內部會調用模板方法tryAcquireShared(int arg),同獨佔式獲取獲取同步同步狀態同樣,也是須要用戶自定義的。當tryAcquireShared(int arg)方法返回值小於0時,表示沒有獲取到同步狀態,則調用doAcquireShared(int arg)方法獲取同步狀態。反之,已經獲取同步狀態成功,則不進行任何的操做。關於doAcquireShared(int arg)方法具體實現以下所示:

private void doAcquireShared(int arg) {
	    //(1)添加共享式節點在AQS中FIFO隊列中
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            //(2)自旋獲取同步狀態
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
	                    //當獲取同步狀態成功後,設置head指針
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                //(3)判斷線程是否須要阻塞
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
	        //(4)若是線程已經中斷,則喚醒下一節點
            if (failed)
                cancelAcquire(node);
        }
    }
複製代碼

總體來看,共享式獲取的邏輯與獨佔式獲取的邏輯幾乎同樣,仍是如下幾個步驟:

  • (1)添加共享式節點在AQS中FIFO隊列中,這裏須要注意節點的構造爲 addWaiter(Node.SHARED),其中 Node.SHARED爲Node類中的靜態常量(static final Node SHARED = new Node()),且經過addWaiter(Node.SHARED)方法構造的節點狀態爲初始狀態,也就是waitStatus= 0

  • (2)自旋獲取同步狀態,若是當前節點的上一節點爲head節點,其獲取同步狀態成功,那麼將調用setHeadAndPropagate(node, r);,從新設置head指向當前節點。同時從新設置該節點狀態waitStutas = Node.PROPAGATE(共享狀態),而後直接退出doAcquireShared(int arg)方法。具體狀況以下圖所示:

共享式自旋判斷.png

  • (3)若是不知足條件(2),那麼會判斷當前節點的上一節點不是head指向的節點,或者獲取當前節點同步狀態失敗,那麼會先調用shouldParkAfterFailedAcquire(Node pred, Node node)方法來判斷是須要否阻塞當前線程,若是該方法返回true,則調用parkAndCheckInterrupt()方法來阻塞線程。若是該方法返回false,那麼該方法內部會把當前節點的上一節點的狀態修改成Node.SINGAL。具體狀況以下圖所示:

共享式線程阻塞.png

  • (4)若是線程已經中斷,則喚醒下一節點

前面咱們提到了,共享式與獨佔式獲取同步狀態的主要不一樣在於其設置head指針的方式不一樣,下面咱們就來看看共享式設置head指針的方法setHeadAndPropagate(Node node, int propagate)。具體代碼以下:

private void setHeadAndPropagate(Node node, int propagate) {
	    //(1)設置head 指針,指向該節點
        Node h = head; // Record old head for check below
        setHead(node);
        
        //(2)判斷是否執行doReleaseShared();
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            //若是當前節點的下一節點是共享式獲取同步狀態節點,則調用doReleaseShared()方法
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }
複製代碼

在setHeadAndPropagate(Node node, int propagate)方法中有兩個參數。 第一個參數node是當前共享式獲取同步狀態的線程節點。 第二個參數propagate(中文意思,繁殖、傳播)是共享式獲取同步狀態線程節點的個數。

其主要邏輯步驟分爲如下兩個步驟:

  • (1)設置head 指針,指向該節點。從中咱們能夠看出在共享式獲取中,Head節點老是指向最進獲取成功的線程節點!!!
  • (2)判斷是否執行doReleaseShared(),從代碼中咱們能夠得出,主要經過該條件if (s == null || s.isShared()),其中 s爲當前節點的下一節點(也就是說同一時刻有可能會有多個線程同時訪問)。當該條件爲true時,會調用doReleaseShared()方法。關於怎麼判斷下一節點是不是否共享式線程節點,具體邏輯以下:
//在共享式訪問中,當前節點爲SHARED類型
   final Node node = addWaiter(Node.SHARED);
   
   //在調用addWaiter 內部會調用Node構造方法,其中會將nextWaiter設置爲Node.SHARED。
   Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }
   //SHARED爲Node類靜態類    
   final boolean isShared() {
            return nextWaiter == SHARED;
        }
        
複製代碼

下面咱們繼續查看doReleaseShared()方法的具體實現,具體代碼以下所示:

private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
	                //(1)從上圖中,咱們能夠得知在共享式的同步隊列中,若是存在堵塞節點,
	                //那麼head所指向的節點狀態確定爲Node.SINGAL,
	                //經過CAS操做將head所指向的節點狀態設置爲初始狀態,若是成功就喚醒head下一個阻塞的線程
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);//喚醒下一節點線程,上文分析過該方法,這裏就不在講了
                }
				//(2)表示該節點線程已經獲取共享狀態成功,則經過CAS操做將該線程節點狀態設置爲Node.PROPAGATE
				//從上圖中,咱們能夠得知在共享式的同步隊列中,
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   //若是head指針發生改變一直循環,不然跳出循環
                break;
        }
    }
複製代碼

從代碼中咱們能夠看出該方法主要分爲兩個步驟:

  • (1)從上圖中,咱們能夠得知在共享式的同步隊列中,若是存在堵塞節點,那麼head所指向的節點狀態確定爲Node.SINGAL,經過CAS操做將head所指向的節點狀態設置爲初始狀態,若是成功就喚醒head下一個阻塞的線程節點,反之繼續循環。
  • (2)若是(1)條件不知足,那麼說明該節點已經獲取成功的獲取同步狀態,那麼經過CAS操做將該線程節點的狀態設置爲waitStatus = Node.PROPAGATE,若是CAS操做失敗,就一直循環。
共享式同步狀態釋放

當線程獲取同步狀態成功並執行相應邏輯後,須要釋放同步狀態,使得後繼線程節點可以繼續獲取同步狀態,經過調用AQS的releaseShared(int arg)方法,能夠釋放同步狀態。具體代碼以下:

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
複製代碼

獨佔式與共享式超時獲取同步狀態

由於獨佔式與共享式超時獲取同步狀態,與其自己的非超時獲取同步狀態邏輯幾乎同樣。因此下面就以獨佔式超時獲取同步狀態的相應邏輯進行講解。

在獨佔式超時獲取同步狀態中,會調用tryAcquireNanos(int arg, long nanosTimeout)方法,其中具體nanosTimeout參數爲你傳入的超時時間(單位納秒),具體代碼以下所示:

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquire(arg) ||
            doAcquireNanos(arg, nanosTimeout);
    }
複製代碼

觀察代碼,咱們能夠得知若是當前線程已經中斷,會直接拋出InterruptedException,若是當前線程可以獲取同步狀態( 調用tryAcquire(arg)),那麼就會直接返回,若是當前線程獲取同步狀態失敗,則調用doAcquireNanos(int arg, long nanosTimeout)方法來超時獲取同步狀態。那下面咱們接着來看該方法具體代碼實現,代碼以下圖所示:

private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        //(1)計算超時等待的結束時間
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                //(2)若是獲取同步狀態成功,直接返回
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                //若是獲取同步狀態失敗,計算的剩下的時間
                nanosTimeout = deadline - System.nanoTime();
                //(3)若是超時直接退出
                if (nanosTimeout <= 0L)
                    return false;
                //(4)若是沒有超時,且nanosTimeout大於spinForTimeoutThreshold(1000納秒)時,
                //則讓線程等待nanosTimeout (剩下的時間,單位:納秒。)
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                //(5)若是當前線程被中斷,直接拋出異常    
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
複製代碼

整個方法爲如下幾個步驟:

  • (1)在線程獲取同步狀態以前,先計算出超時等待的結束時間。(單位精確到納秒)
  • (2)經過自旋操做獲取同步狀態,若是成功,則直接返回
  • (3)若是獲取同步失敗,則計算剩下的時間。若是已經超時了就直接退出。
  • (4)若是沒有超時,則判斷當前剩餘時間nanosTimeout是否大於spinForTimeoutThreshold(1000納秒),若是大於,則經過 LockSupport.parkNanos(this, nanosTimeout)方法讓線程等待相應時間。(該方法會在根據傳入的nanosTimeout時間,等待相應時間後返回。),若是nanosTimeout小於等於spinForTimeoutThreshold時,將不會使該線程進行超時等待,而是進入快速的自旋過程。緣由在於,很是短的超時等待沒法作到十分精確,若是這時再進行超時等待,相反會讓nanosTimeout的超時從總體上表現得反而不精確。所以,在超時很是短的場景下,線程會進入無條件的快速自旋。
  • (5)在沒有走(4)步驟的狀況下,表示當前線程已經被中斷了,則直接拋出InterruptedException

最後

到如今咱們基本瞭解了整個AQS的內部結構與其獨佔式與共享式獲取同步狀態的實現,可是其中涉及到的線程的阻塞、等待、喚醒(與LockSupport工具類相關)相關知識點咱們都沒有具體介紹,後續的文章會對LockSupport工具以及後期關於鎖相關的等待/通知模式相關的Condition接口進行介紹。但願你們繼續保持着學習的動力~~。

總結

  • 整個AQS是基於其內部的FIFO隊列實現同步控制。請求的線程會封裝爲Node節點。
  • AQS分爲總體分爲獨佔式與共享式獲取同步狀態。其支持線程的中斷,與超時獲取。
相關文章
相關標籤/搜索