AbstractQueuedSynchronizer(AQS)抽絲剝繭深刻了解JUC框架原理

簡介

AQS(AbstractQueuedSynchronizer)是併發開發中一個基礎組件。主要實現了同步狀態管理、線程隊列管理、線程等待、線程喚醒等底層操做。JDK中許多的併發類都是依賴AQS的。 ReentrantLock(可重入鎖)、Semaphore(信號量)、CountDownLatch(計數器)。java

Lock簡單實用

  • 介紹原理前咱們簡單來看看Lock使用。
public static void main(String[] args) {
    Integer index = 0;
    ReentrantLock lock = new ReentrantLock();
    List<Thread> threadList = new ArrayList<>();
    for (int i = 0; i < 100; i++) {
        int finalI = i;
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(new Random().nextInt(100));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                lock.lock();
                System.out.println(finalI);
                lock.unlock();
            }
        });
        threadList.add(thread);
    }
    for (Thread thread : threadList) {
        thread.start();
    }
}
  • 就是lock 和unlock的使用。就可以保證中間的業務是有序執行的。上面不會保證輸出數字有序,可是能保證輸出的個數是100個,由於這裏咱們理解成他們會進入隊列中。可是進入的順序不肯定。那麼下面咱們看看lock 、unlock 與咱們今天的主角AQS有什麼關係。

主體框架

AQS提供了一個依賴FIFO(先進先出)等待隊列的阻塞鎖和同步器的框架。該類是一個抽象類。其中暴露出來的方法主要用來操做狀態和類別判斷。這些方法咱們不須要考慮阻塞問題,由於在AQS中調用這些方法的地方會處理阻塞問題node

方法 描述
boolean tryAcquire(int args) 嘗試獲取獨佔鎖
boolean tryRelease(int args) 嘗試釋放獨佔鎖
int tryAcquireShared(int args) 嘗試獲取共享鎖
boolean tryReleaseShared(int args) 嘗試釋放共享鎖
boolean isHeldExclusively() 當前線程是否得到了獨佔鎖

其餘方法有AQS類實現。在AQS中實現的方法會調用到上面的抽象方法。正常子類是已內部類方式呈現的。這樣的好處能夠作到封閉式的同步屬性。AQS內部實現的方法大概介紹數據結構

方法 描述
void acquire(int args) 獲取獨佔鎖,內部調用tryAcquire方法,
void acquireInterruptibly(int args) 響應中斷版本的acquire
boolean tryAcquireNanos(int args , long nanos) 響應中斷+超時版本的acquire
void acquireShared(int args) 獲取共享鎖,內部調用tryAcquireShared方法
void acquireSharedInterruptibly(int args) 響應中斷版本的獲取共享鎖
boolean tryAcquireSharedNonos(int args,long nanos) 響應中斷+超時獲取共享鎖
boolean release(int args) 釋放獨佔鎖
boolean releaseShared(int args) 釋放共享鎖
Collection getQueuedThreads() 獲取同步隊列上的線程集合

原理解析

AQS內部是經過一個雙向鏈表來管理鎖的(俗稱CLH隊列)。
當前程嘗試獲取鎖失敗時,會將當前線程包裝成AQS內部類Node對象加入到CLH隊列中,並將當前線程掛起。當有線程釋放本身的鎖時AQS會嘗試喚醒CLH隊列中head後的直接後繼的線程。AQS的status咱們能夠根據他來作成不一樣的需求。這個後續再說。下面咱們已ReentrantLock來講明下AQS原理。多線程

  • 上面標註的是ReentrantLock中的lock方法。這個方法表示去上鎖。瞭解Lock的都知道這個方法會一直阻塞住知道上鎖成功纔會執行完。而ReentrantLock.lock方法實際上的sync對象去上鎖的。而sync在ReentrantLock中有公平鎖和非公平鎖兩種。

  • 在AQS中默認的是非公平鎖,即隨機喚醒線程。


  • 經過上面繼承關係咱們發現了咱們今天的主角-AbstractQueueSynchronizer 。

  • NonfairSync實現了兩個方法lock、tryAcquire方法。其中lock就是經過狀態位實現鎖機制的。0-未上鎖;1-已上鎖 。 lock的邏輯就是若是上鎖成功會將狀態置爲1且設置獨佔模式的所屬線程爲當前線程。不然調用acquire嘗試獲取鎖。

獨佔鎖

AQS數據結構

  • AQS裏面主要是狀態位的管理。下面咱們看看包含的屬性
Class AbstractQueuedSynchronizer{
    /*隊列中的頭結點,無實際意義,head的後繼節點纔是隊列中的第一個節點*/
    private transient volatile Node head;
    /*隊列中的尾節點*/
    private transient volatile Node tail;
    /*隊列中的狀態,上鎖解鎖 能夠擴展成不一樣的狀態  。 AQS實際上也是對該字段的管理。子類中經過get set compare方法對state的管理*/
    private volatile int state;
}

CLH數據結構

  • 上面咱們瞭解到會將線程包裝成Node對象加入到雙向鏈表(CLH)中。下面咱們看看Node的結構吧
static final class Node {
    /*共享模式的標記*/
    static final Node SHARED = new Node();
    /*獨佔模式的標記*/
    static final Node EXCLUSIVE = null;
    /*隊列等待狀態-取消*/
    static final int CANCELLED =  1;
    /*隊列等待狀態-喚醒*/
    static final int SIGNAL    = -1;
    /*隊列等待狀態-條件等待*/
    static final int CONDITION = -2;
    /*隊列等待狀態-廣播*/
    static final int PROPAGATE = -3;
    /*隊列等待狀態,取值範圍就是上面的等待狀態之一*/
    volatile int waitStatus;
    /*前驅節點*/
    volatile Node prev;
    /*後繼節點*/
    volatile Node next;
    /*節點對應的線程:綁定關係*/
    volatile Thread thread;
    /*TODO*/
    Node nextWaiter;
    /*斷定是不是共享模式*/
    final boolean isShared() {
        return nextWaiter == SHARED;
    }
    /*獲取當前節點的前驅節點,若是沒有前驅節點拋出NullPointerException*/
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }
    /*用於建立雙向鏈表中的Head節點,其實Head節點就是一個標誌並不會與線程掛鉤。至關於一個隊列的默認頭節點。或者用來建立共享模式的節點。由於共享模式的節點就是無參構造*/
    Node() {
    }
    /*將線程包裝成Node對象加入隊列中,源碼中是用來添加Thread至隊列*/
    Node(Thread thread, Node mode) {
        this.nextWaiter = mode;
        this.thread = thread;
    }
    /*經常使用語加入條件狀態隊列中TODO*/
    Node(Thread thread, int waitStatus) {
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

acquire實現步驟

  • 上面咱們瞭解到Lock中實現lock的底層是AQS的acquire實現的。併發

  • 經過查看源碼咱們大概能瞭解到其上鎖的流程,框架

    • 首先嚐試獲取鎖
    • 獲取鎖失敗後,將當前線程包裝成Node對象添加到CLH隊列中
    • 自行阻塞當前線程,等待隊列喚醒本身
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

addWaiter

/**
 * 經過Node對象的構造函數構造Node對象添加到CLH隊列中
 * 這個方法主要是雙向鏈表的操做。C++的同窗應該會很容易理解
 */
private Node addWaiter(Node mode) {
    /*當前線程加入隊列後此時是沒有後繼節點的,且已獨佔模式訪問的
    *因此這裏加入的Node在上一不傳入的是Node.EXCLUSIVE,這裏就表示
    *是已獨佔模式進行上鎖從而進行加入隊列的
    */
    Node node = new Node(Thread.currentThread(), mode);
    /*獲取隊列中的最後一個Node節點;這裏是進行快速插入測試。
    *默認隊列已經在堆積Node節點了這個時候直接將節點追加到tail裏。
    *其實這裏和enq()方法是同樣的邏輯。只不過enq裏面會進行等待隊列
    *正常纔會加入
    */
    Node pred = tail;
    if (pred != null) {
        /*隊列已經產生線程等待就會將當前node節點的前驅節點只爲tail
        *的複製節點
        */
        node.prev = pred;
        /*基於CAS(內部UnSafe實現)設置尾部爲node節點*/
        if (compareAndSetTail(pred, node)) {
            /*本來的tail節點的後繼節點天然就是node節點*/
            pred.next = node;
            /*到這裏node節點就已經加入了CLH隊列中*/
            return node;
        }
    }
    /*邏輯同上,不在贅述*/
    enq(node);
    return node;
}

acquireQueued

  • 這裏傳的Node是咱們上一步剛剛添加到隊尾的節點。爲何不直接用tail節點呢?咱們仔細觀察發現tail的修飾
private transient volatile Node tail;
  • 咱們知道volatile是內存可見的。什麼叫內存可見。咱們的屬性變量是存儲在內存中的。每次有線程啓動訪問這個類的時候都會複製內存中屬性值到本身線程中。因此在多線程狀況下修改了這個屬性就會出現問題由於A線程修改了值可是B線程並沒有法感知仍是以原先的值進行交互。這就是典型的多線程帶來的問題。而volatile作到了的線程感知。當A線程修改了tail後立馬B線程就感知到了。可是這並不能完全的解決多併發的問題。這裏咱們簡單介紹下這個關鍵字
  • 通過上面簡單闡述高併發場景,因此這裏不能直接用tail。由於這個時候tail頗有可能已經不是咱們的tail的。這裏直接傳遞Node節點是很是明智的選擇。並且是final修飾的。更加保證了使咱們上一步驟添加到隊尾的那個節點
/**
 * 再次嘗試獲取鎖,對中斷不敏感。
 */
final boolean acquireQueued(final Node node, int arg) {
    /*失敗標誌位*/
    boolean failed = true;
    try {
        /*線程是否被打斷標誌位*/
        boolean interrupted = false;
        /**/
        for (;;) {
            /*獲取當前想成包裝的Node節點的前驅節點*/
            final Node p = node.predecessor();
            /*若是前驅節點是head節點表示當前節點在隊首能夠嘗試
            *獲取下鎖,這裏爲何是嘗試獲取呢由於這個時候可能鎖
            *還被其餘線程佔着。這裏嘗試獲取純粹就是試試機會
            */
            if (p == head && tryAcquire(arg)) {
                /*成功獲取到鎖,說明咱們試一試的心態成功了。
                *人生也同樣,總得試一試萬一成功了呢。看源碼還
                *能學到人生道理呢。劃重點
                */
                /*這個時候在tryAcquire中已經被當前線程佔用了鎖了。
                *咱們這裏不須要擔憂其餘線程會搶佔,這個時候咱們
                *須要將當前線程從隊列中踢出,直接將當前線程置爲
                *head節點。setHead方法也很簡單,將node的前驅節
                *點置爲null,由於head是首位,首位以前不該該在
                *有節點了,而後線程也被銷燬了
                */
                setHead(node);
                /*p節點是老的head節點這個時候已經不須要了。
                *這裏jdk的操做是將next至爲null, 這樣p節點
                *就成爲不可達狀態,接下來的命運就是等待被GC。
                *這裏咱們不是將p置爲null的緣由是咱們p=null  , 
                *只是將p指向null, 可是原先的head的那個Node的
                *地址任然經過Node進行指向,GC是沒法回收的。好好理解下*/
                p.next = null; // help GC
                /*這裏咱們已經獲取了。並且成功上了鎖。因此這裏就
                * 沒法取消獲取了,並且咱們已經將Node剔除了,也
                * 沒有必要再進行取消獲取操做了。因此在finnally中
                * 就不必執行了*/
                failed = false;
                /*返回線程是否被中斷狀態*/
                return interrupted;
            }
            /*若是當前線程對應的Node節點不是head的後繼節點或者
            * 沒有獲取到鎖,這個時候咱們開始阻塞線程*/
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            /*取消當前線程對應的Node節點在隊列中排隊。這裏能夠
            *理解成棄權操做。這裏取消會順便遍歷以前的節點若是
            * 有棄權的這裏會一併操做掉
            */
            cancelAcquire(node);
    }
}

shouldParkAfterFailedAcquire

/**
 * 在失敗獲取鎖的狀況下判斷是否須要對線程進行阻塞並贊成修改線程
 * 在隊列中狀態。若是前驅節點是SIGNAL狀態那麼node節點就進入
 * 準備狀態。前驅節點CANEL狀態須要剔除。若是是CONDITION或者
 * PROGAGATE狀態,在ReentrantLock中咱們暫時不考慮這二者狀況,
 * 因此這裏就強制轉換爲SIGNAL狀態
 */
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    /*獲取前驅節點的狀態*/
    int ws = pred.waitStatus;
    /*若是前驅節點是等待通知狀態,那麼當前節點須要等待前驅
    * 結點被喚醒,因此這裏須要被阻塞
    */
    if (ws == Node.SIGNAL)
        return true;
    /*若是前驅節點>0,即爲canclled狀態*/
    if (ws > 0) {
        //這裏其實和cancelAcquire邏輯差很少,須要將取消的節點從隊列中剔除
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*剩下的狀況,統一將節點狀態更正爲等待通知狀態*/
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

parkAndCheckInterrupt

/**
 * 阻塞當前線程,等待被喚醒
 */
private final boolean parkAndCheckInterrupt() {
    /*這裏就是阻塞線程,並等待LockSupport.unpark喚醒*/
    LockSupport.park(this);
    /*在park以後咱們須要Thread.interrupted恢復下線程的中斷狀態,
    * 這樣下一次park纔會生效。不然下一次的park不會生效的
    */
    return Thread.interrupted();
}

cancelAcquire

/**
 * 將node節點以前(包括當前node)取消狀態的所有剔除
 */
private void cancelAcquire(Node node) {
    if (node == null)
        return;
    /*剔除操做須要解綁node和thread關係*/
    node.thread = null;
    /*獲取node的前驅節點*/
    Node pred = node.prev;
    /*大於0就是取消狀態*/
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;
    Node predNext = pred.next;
    /*這裏直接置爲取消狀態,是爲了方便其餘線程進行取消是的操做,
    * 也是爲了方便跳躍該節點
    */
    node.waitStatus = Node.CANCELLED;
    /*若是node是隊尾的haul,那麼將隊尾設置成node的前驅結點*/
    if (node == tail && compareAndSetTail(node, pred)) {
        /*將隊尾的pred節點的後繼節點置空,這是一個隊列的標準要求*/
        compareAndSetNext(pred, predNext, null);
    } else {
        //若是是非隊尾節點
        int ws;
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
                (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            /*pred節點狀態若是是有效節點且不是head,將pred的後繼
            * 節點指向node的後繼節點。這裏和C++指針指向是一個道理*/
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                /*node的後繼節點是有效節點且不是取消狀態,進行替換*/
                compareAndSetNext(pred, predNext, next);
        } else {
            /*
            * 這裏就是對上面提到的阻塞進行放行。裏面
            * 其實是LockSupport.unpark進行放行的。
            * 這個時候咱們經過上面的if知道,這個時候在如下幾種場景出現
            * 一、pred==head
            * 二、pred是取消狀態
            * 三、pred.thread==null 即不是有效節點
            * 以上這些狀況都表示pred不是能進行喚醒的節點,咱們
            * 這裏理解爲不是標準節點。這個時候爲了保證隊列的活躍性,
            * 咱們須要喚醒後繼節點,實際上就是node的後繼節點。
            */
            unparkSuccessor(node);
        }
        node.next = node; // help GC
    }
}
  • 在上面代碼中當代碼執行到unparkSuccessor(node)這一塊時就會去喚醒node節點。可是咱們的canelAcquire方法是爲了取消node節點以前取消狀態的節點的。這樣就會與咱們功能違背。命名方法是爲了剔除canel節點。如今確實去喚醒node節點。這裏咱們上面shouldParkAfterFailedAcquire方法中在狀態>0時回去自動剔除這些節點的。這樣就實現了canelAcquire方法的功能了。因此咱們不須要糾結。
    ps: 源碼終究是源碼,考慮的是很是全面的。
if (ws > 0) {
    //這裏其實和cancelAcquire邏輯差很少,須要將取消的節點從隊列中剔除
    do {
        node.prev = pred = pred.prev;
    } while (pred.waitStatus > 0);
    pred.next = node;
}

unparkSuccessor

/**
 * 喚醒node節點
 */
private void unparkSuccessor(Node node) {
    /*獲取當前節點的狀態*/
    int ws = node.waitStatus;
    /*對狀態進行判斷*/
    if (ws < 0)
        /*若果小於0,則進行強制糾偏爲0*/
        compareAndSetWaitStatus(node, ws, 0);
    /*獲取當前節點的後繼節點*/
    Node s = node.next;
    /*判斷*/
    if (s == null || s.waitStatus > 0) {
        /*後繼節點爲有效節點且狀態>0 , 這裏即爲CANCELLED狀態,
        * 則將該節點在CLH中剔除,並進行斷層鏈接*/
        s = null;
        /*這裏和向前去除取消狀態的前驅節點同樣,只不過這裏是向後
         *至於爲何是從後向前呢,是爲了不高併發帶來的節點不一
         * 致性。由於從node開始日後的話,頗有可能後面會被其餘
         * 線程修改了。由於添加節點的日後添加的。因此從後往前的話這樣能保證數據一致。可是這樣就會致使其餘線程添加的節點是沒法訪問到的。這一點和數據一致性比較仍是前者比較重要。這次獲取不到不要緊,在獲取鎖的時候jdk使用的是for循環。會不停的檢查隊列中節點是否能夠被喚醒的。這裏咱們理解是一個定時器。因此一次獲取不到節點不要緊。總有一次會被喚醒。 
         */
        for (Node t = tail; t != null && t != node; t = t.prev)
            /*head節點狀態應該是0,因此這裏最後s就是head.因此後面釋放* 的就是head的後繼節點。*/
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        /*這裏對應的是parkAndCheckInterrupt中的
        * LockSupport.lock(this)方法。unpark
        * 以後parkAndCheckInterrupt方法就會執行到Thread.interrupted
        * 並進行返回,這個時候回返回true*/
        LockSupport.unpark(s.thread);
}

acquire

  • 到這裏acquire執行步驟咱們按照方法維度一一進行閱讀了。咱們大概梳理下就是第一步獲取鎖,獲取失敗就會加入隊列,這個時候該線程會被阻塞,在加入隊列的過程當中會進行鍼對隊列進行無效節點去除(取消狀態或者參數null等狀況)。保證隊列裏的node都是有效且活躍的節點。這個過程會保證隊列是運轉的。若是加入隊列順利的話下一步就是自行的中斷線程進行掛起Thread.currentThread().interrupt();,其實執行到這一步就表示這個線程已經不須要了。被取消了。後續會將這個線程做廢。

下面貼出一個來自於博客園大神的原理圖dom

release

  • 獲取獨佔鎖的邏輯仍是很複雜的,裏面涉及到操做雙向鏈表的操做,若是沒有接觸過C++應該仍是很吃力的。其實在獲取的邏輯中已經牽涉了釋放的邏輯。在咱們喚醒node的後繼節點其實也是釋放邏輯中的重頭戲。


public final boolean release(int arg) {
    /*會調用tryRelease,這個方法是有子類實現的。咱們在ReentrantLock
    * 中應該是非公平鎖實現的tryRelease。這個方法後面會說。
    * 這裏咱們須要提一點的:當一個線程獲取到鎖時,它對應的Node是
    * 不會再隊列中的。因此這裏釋放咱們能夠理解成喚醒Head的後繼節點。
    * 這裏就和上面喚醒node的後繼節點同樣了。因此你會看到一樣的
    * 方法*unparkSuccessor(h)
    */
    if (tryRelease(arg)) {
        /*獲取CLH隊列中的隊首節點*/
        Node h = head;
        if (h != null && h.waitStatus != 0)
            /*喚醒head節點的後繼節點*/
            unparkSuccessor(h);
        return true;
    }
    return false;
}
  • 這裏須要解釋下爲何會對head節點進行判斷。由於AQS中head默認的null。那麼head是什麼建立的呢。是在咱們上面加鎖的時候加入,在加入隊列後須要進行前驅結點判斷的時候建立head的。這個時候的head沒有設置狀態。那麼這個狀態是默認0的。因此上面判斷只須要判空就好了。可是爲了嚴謹JDK進行雙重判斷了。
private transient volatile Node head;
  • 因此這裏須要對head進行判空。

tryRelease

  • 其實在上面acquire步驟講解中,咱們漏掉了tryAcquire方法的閱讀。目的是爲了和tryRelease方法進行合併講解。由於這兩個方法都是交由子類實現的。放在一塊兒講咱們更加能理解設計 。 在ReentrantLock中tryAcquire是有非公平鎖的nonfairTryAcquire實現的
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    /*首先獲取獨佔鎖的state*/
    int c = getState();
    if (c == 0) {
        /*c==0表示當前獨佔鎖沒有被任何線程佔用。這個時候是能夠加鎖的*/
        if (compareAndSetState(0, acquires)) {
            /*設置當前擁有次所的線程爲當前線程*/
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        /*由於這個判斷,實現了可重入式的鎖,這樣一個線程能夠重複上鎖操做。*/
        /*c!=0表示已有線程佔用。若是是當前線程的表示被重入了。那麼這個獨佔鎖state就會繼續累加。這裏的state是AQS的state和Node裏面waitStaus是兩回事。在這裏累加在釋放方法裏就是遞減。這樣對比咱們就容易理解了。這裏的status不一樣的實現有着不一樣的定位功能*/
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            /*這裏的判斷着實沒有看懂。但願大神指點。*/
            throw new Error("Maximum lock count exceeded");
        /*CAS設置state*/
        setState(nextc);
        return true;
    }
    return false;
}
protected final boolean tryRelease(int releases) {
    /*看完tryAcquire中遞增的操做,咱們就能理解這裏遞減的邏輯了*/
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        /*c==0表示這個線程由於可重入的上鎖方式,徹底的釋放的獨佔鎖。這個時候才能夠被別的線程佔用*/
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

共享鎖

  • 共享鎖的實現主要應用場景就是在讀場景。獨佔鎖應用場景就是寫場景。這個在ReentrantReadWriteLock類中使用了這兩種場景。上面獨佔鎖咱們經過ReentrantLock閱讀了一遍。下面咱們經過ReentrantReadWriteLock來體驗下共享鎖的邏輯吧。ide

  • 共享鎖邏輯有所變更。可是裏面涉及到的方法在獨佔鎖中都提到了。下面咱們會說起下未提到的方法。公用的方法聰明的你應該是閱讀明白了。函數


  • 一樣tryAcquireShared方法這裏暫時不看。到後面和釋放方法一塊兒閱讀。咱們先來經過doAcquireShared方法爲入口進行閱讀

獲取共享鎖

doAcquireShared

/**
 * 這個方法仔細看其實和獨佔鎖acquire是同樣的邏輯。只不過方法全都提到方* 法內部了。
 * addWaiter和獨佔鎖中是一個方法
 * 後面的for循環也是同樣的,若是是head的後繼節點則會執嘗試獲取鎖,並替* 換head。而且若是線程阻塞過就會自行中斷線程等操做。因此看完獨佔鎖在* 學習共享鎖就很容易了。二者雖有不一樣可是仍是及其類似的
 */
private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                /*若是前驅節點是head節點就會去嘗試獲取鎖,有可能會成功*/
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    /*獲取成功就會將節點剔除,從而head節點指向最新節點*/
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

setHeadAndPropagate

/**
 * progagate表示當前共享鎖的容量
 * node 表示當前線程對應的Node
 */
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    /*
     * 在方法外部已經確保了Progagate>=0
     * progagate=0表示當前共享鎖已經沒法被獲取了。因此這裏條件
     * 之一 progagte>0
     * 一、progagate>0 那麼就會查看隊列中後繼節點是否符合條件,若是符* 合的 則經過doReleaseShared方法進行喚醒隊列中head的後繼節點
     * 二、head==null 表示AQS尚未建立head這個時候出發釋放的方法是爲* 了讓釋放這個過程啓動。內部實現由於是for循環。至關於監聽head節點
     * 三、head.waitStatus<0 表示在doReleaseShared被設置成
     * Node.PROGAGATE屬性了。釋放鎖的時候會設置head的狀態從
     * SIGNAL置爲0,也會從0置爲PROGAGATE。head節點默認的狀態也是0,
     * 因此這裏的head狀態小於0只多是被另一個線程釋放資源是
     * 執行了置爲PROGAGATE的代碼了。雖然progagate==0可是隻是
     * 獲取那會是0在高併發場景下會被改變的。既然另一個線程釋放
     * 資源那麼這裏天然就能夠去喚醒隊列線程去嘗試獲取。這裏條件判斷
     * 咱們後面整個邏輯講完會從新梳理下這個地方
     */
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

doReleaseShared

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            /*獲取head*/
            int ws = h.waitStatus;
            /*
             *head節點狀態默認是0,因此在隊列中第一次應該是進入
             *下面的if中而且設置head節點爲傳播狀態;設置成傳播狀
             *態的目的是爲了方便對應上面咱們方法中的
             *判斷h.waitStatus < 0 。這樣就會去喚醒head節點
             *的後繼節點了。這個時候可能會失敗可是共享就是讓他
             *們儘量的獲取。因此這裏設置傳播狀態。也有可能
             *會通過shouldParkAfterFailedAcquire方法將傳播
             *狀態糾偏爲SIGNAL狀態,也就是後面會被糾正過來。這個
             *時候須要和shouldParkAfterFailedAcquire對比,
             *shouldParkAfterFailedAcquire是遇到SIGNAL狀態對
             *後繼節點進行阻塞,而在這裏是遇到SIGNAL狀態就進行釋放
             */
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                /*與獨佔鎖同樣*/
                unparkSuccessor(h);
            }
            /*這裏就是設置傳播狀態,與setHeadAndPropagate方法對應*/
            else if (ws == 0 &&
                        !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

釋放共享鎖

  • tryReleaseShared同理是交由子類實現的。後面咱們經過ReentrantReadWriteLock類來看這兩個方法實現邏輯。最終AQS的釋放邏輯仍是放在的doReleaseShared方法上。

doReleaseShared

  • 在上面閱讀獲取共享鎖時,設置head節點後會檢查後繼節點,判斷是否須要喚醒的時候就是doReleaseShared 。 因此這個方法這裏也不須要說了。

tryAcquireShared

/**
 * 與讀鎖不衝突的前提下獲取寫鎖,有剩餘的前提下會一直獲取直至獲取成功,
 * 獲取失敗返回 -1
 * 獲取成功返回  1 
 */
protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();
    int c = getState();
    /*exclusiveCount就是c與獨佔鎖容量的一個與運算。共享容量2^16-1  
     *因此只要c!=0  exclusiveCount(c)就!=0,另外一個條件時判斷是否
     *是當前線程。這個也是可重入式鎖的憑證
     */
    /*
     * 讀鎖和寫鎖是互斥的,因此這裏若是其餘線程已經獲取了寫鎖,那麼
     * 讀鎖就無法獲取了。
     */
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    /*sharedCount就是獲取共享鎖的容量*/
    int r = sharedCount(c);
    /*readerShouldBlock就是判斷是否須要對該節點進行阻塞,只要是有
     *效節點且是共享節點就不阻塞;讀鎖寫鎖是一個32位表示的,高位寫
     *鎖低位讀鎖,SHARED_UNIT是低16位,因此這裏就是增長讀鎖次數*/
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        if (r == 0) {
            /*表示第一次讀*/
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
            /*第一次讀的線程重複讀,累計線程讀取次數*/
            firstReaderHoldCount++;
        } else {
            /*實際上就是一個ThreadLocal管理讀的次數。和上面firstReader做用同樣。*/
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }
    /*高併發場景下CAS疊加次數不必定會成功,這個時候須要*fullTryAcquireShared再次獲取讀鎖,這個方法邏輯和上面能夠說是
    *同樣的。那麼爲何他叫full ,由於裏面用了循環確保在有剩餘的條件
    *下一隻獲取讀鎖。不會由於CAS的問題獲取不到*/
    return fullTryAcquireShared(current);
}

tryReleaseShared

/**
 * 這裏只要有讀鎖存在就會返回false  ,這裏有個疑問,
 * 若是返回false那麼AQS的release就沒法去釋放隊列。這種狀況
 * 是由於隊列自己是活躍的。會按順序釋放鎖的。而讀鎖的釋放
 * 其實在tryReleseShared裏就釋放了。讀鎖其實就是計數。
 * 這裏會在ReentrantReadWriteLock章節詳細解說
 */
protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    if (firstReader == current) {
        // 當前線程是第一個獲取讀鎖的。這裏會加讀的次數一直遞減。
        //當前線程所有釋放完了,就接觸當前線程的佔位
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
        //這裏針對非第一個獲取讀鎖的線程進行釋放。顯示次數的釋放
        //徹底釋放後就丟棄線程
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;
    }
    //這裏和上面的fullTryAcquireShared對應。循環釋放一直到釋放成功爲止
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            // Releasing the read lock has no effect on readers,
            // but it may allow waiting writers to proceed if
            // both read and write locks are now free.
            return nextc == 0;
    }
}

總結

AQS是jdk中併發類的一個底層原理。好多jdk的併發類都是基於此實現的。AQS其實就是一個框架。簡單總結幾句話高併發

  • AQS是併發的一個基類
  • 內部維護了FIFO隊列
  • 擁有兩種模式: 獨佔模式(寫鎖)、共享模式(讀鎖)

內部state就是表示鎖的狀態。不一樣的實現能夠有不一樣的定義。

ReentrantLock : 純粹鎖的狀態 +一、-1 Semaphore : 鎖的個數 CountDownLatch: 計數器,一個標誌位

相關文章
相關標籤/搜索