AQS源碼分析看這一篇就夠了

好了,咱們來開始今天的內容,首先咱們來看下AQS是什麼,全稱是
AbstractQueuedSynchronizer翻譯過來就是【抽象隊列同步】對吧。經過名字咱們也能看出這是個抽象類

並且裏面定義了不少的方法

裏面這麼多方法,我們固然不是一個個去翻。裏面還有不少的抽象方法,我們還得找它的實現多麻煩對不對。因此咱們換個方式來探索。java

場景模擬

  咱們先來看下這樣一個場景

在這裏咱們有一個能被多個線程共享操做的資源,在這個場景中應該能看出咱們的數據是不安全的,由於咱們並不能保證咱們的操做是原子操做對吧。基於這個場景咱們經過代碼來看看效果node

package com.example.demo;

public class AtomicDemo {

    // 共享變量
    private static int count = 0;

    // 操做共享變量的方法
    public static void incr(){
        // 爲了演示效果  休眠一會兒
        try {
            Thread.sleep(1);
            count ++;
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 1000 ; i++) {
            new Thread(()->AtomicDemo.incr()).start();
        }

        Thread.sleep(4000);
        System.out.println("result:" + count);
    }

}

經過執行發現,執行的結果是一個不肯定的值,但老是會小於等於1000,至於緣由,是由於incr() 方法不是一個原子操做。爲何不是原子操做這個我們今天就不深究此處了.
迎合今天的主題,咱們經過Lock來解決安全

package com.example.demo;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class AtomicDemo {

    // 共享變量
    private static int count = 0;

    private static Lock lock = new ReentrantLock();

    // 操做共享變量的方法
    public static void incr(){
        // 爲了演示效果  休眠一會兒
        try {
            lock.lock();
            Thread.sleep(1);
            count ++;
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 1000 ; i++) {
            new Thread(()->AtomicDemo.incr()).start();
        }

        Thread.sleep(4000);
        System.out.println("result:" + count);
    }

}

而後咱們運行發現結果都是 1000了,這也就是1000個線程都去操做這個 count 變量,結果符合咱們的預期了。那lock究竟是怎麼實現的呢?函數

需求分析

  咱們先來分析分析

這樣的圖片看着比較複雜,我們簡化下。

咱們本身假設下,若是要你去設計這樣的方法,你應該要怎麼設計,他們須要實現哪些功能,
  首先是lock方法,它是否是要知足這幾個功能。

需求清楚了,那咱們怎麼設計呢?
第一個互斥怎麼作,也就是多個線程只有一個線程能搶佔到資源,這個時候咱們能夠這樣設置源碼分析

// 給一個共享資源
Int state = 0 ; // 0表示資源沒有被佔用,能夠搶佔
if(state == 0 ){
   // 表示能夠獲取鎖
}else{
   // 表示鎖被搶佔 須要阻塞等待
}


而後就是沒有搶佔到鎖的線程的存儲,咱們能夠經過一個隊列,利用FIFO來實現存儲。
最後就是線程的阻塞和喚醒。你們說說有哪些阻塞線程的方式呀?ui

1.wait/notify: 不合適,不能喚醒指定的線程
2.Sleep:休眠,相似於定時器
3.Condition:能夠喚醒特定線程
4.LockSupport:
LockSupport.park():阻塞當前線程
LockSupport.unpark(Thread t):喚醒特定線程
結合今天的主題,咱們選擇LockSupport來實現阻塞和喚醒。

好了,到這兒咱們已經猜測到了Lock中的實現邏輯,可是在探究源碼以前咱們還有個概念須要先和你們講下,由於這個是咱們源碼中會接觸到的一個,先講了,看的時候就比較輕鬆了對吧。this

什麼是重入鎖?

  咱們先來看看重入鎖的場景代碼spa

package com.example.demo;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class AtomicDemo {

    // 共享變量
    private static int count = 0;

    private static Lock lock = new ReentrantLock();

    // 操做共享變量的方法
    public static void incr(){
        // 爲了演示效果  休眠一會兒
        try {
            lock.lock();
            Thread.sleep(1);
            count ++;
            // 調用了另一個方法。
            decr();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }

    public static void decr(){
        try {
            // 重入鎖
            lock.lock();
            count--;
        }catch(Exception e){

        }finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 1000 ; i++) {
            new Thread(()->AtomicDemo.incr()).start();
        }

        Thread.sleep(4000);
        System.out.println("result:" + count);
    }

}

首先你們考慮這段代碼會死鎖嗎? 你們給我個回覆,我看看你們的理解的怎麼樣
好了,有說會死鎖的,有說不會,其實這兒是不會死鎖的,並且結果就是0.爲何呢?
  這個實際上是鎖的一個嵌套,由於這兩把鎖都是同一個 線程對象,咱們講共享變量的設計是
  當state=0;線程能夠搶佔到資源 state =1; 若是進去嵌套訪問 共享資源,這時 state = 2 若是有多個嵌套 state會一直累加,釋放資源的時候, state–,直到全部重入的鎖都釋放掉 state=0,那麼其餘線程才能繼續搶佔資源,說白了重入鎖的設計目的就是爲了防止 死鎖!線程

AQS類圖


經過類圖咱們能夠發現右車的業務應用其實內在都有相識的設計,這裏咱們只須要搞清楚其中的一個,其餘的你本身應該就能夠看懂~,好了咱們就具體結合前面的案例代碼,以ReentrantLock爲例來介紹AQS的代碼實現。翻譯

源碼分析

  在看源碼以前先回顧下這個圖,帶着問題去看,會更輕鬆

Lock.lock()

final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

這個方法邏輯比較簡單,if條件成立說明 搶佔鎖成功並設置 當前線程爲獨佔鎖
else 表示搶佔失敗,acquire(1) 方法咱們後面具體介紹

compareAndSetState(0, 1):用到了CAS 是一個原子操做方法,底層是UnSafe.做用就是設置 共享操做的 state 由0到1. 若是state的值是0就修改成1

setExclusiveOwnerThread:代碼很簡單,進去看一眼便可

acquire方法

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

1.tryAcquire()嘗試直接去獲取資源,若是成功則直接返回(這裏體現了非公平鎖,每一個線程獲取鎖時會嘗試直接搶佔加塞一次,而CLH隊列中可能還有別的線程在等待);
2.addWaiter()將該線程加入等待隊列的尾部,並標記爲獨佔模式;
3.acquireQueued()使線程阻塞在等待隊列中獲取資源,一直獲取到資源後才返回。若是在整個等待過程當中被中斷過,則返回true,不然返回false。若是線程在等待過程當中被中斷過,它是不響應的。只是獲取資源後纔再進行自我中斷selfInterrupt(),將中斷補上。
  固然這裏代碼的做用我是提早研究過的,對於你們確定不是很清楚,咱們繼續裏面去看,最後你們能夠回到這兒再論證。

tryAcquire(int)

  再次嘗試搶佔鎖

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
//再次嘗試搶佔鎖
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
// 重入鎖的狀況
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
// false 表示搶佔失敗
    return false;
}

addWaiter

  將阻塞的線程添加到雙向鏈表的結尾

private Node addWaiter(Node mode) {
    //以給定模式構造結點。mode有兩種:EXCLUSIVE(獨佔)和SHARED(共享)
    Node node = new Node(Thread.currentThread(), mode);

    //嘗試快速方式直接放到隊尾。
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }

    //上一步失敗則經過enq入隊。
    enq(node);
    return node;
}

enq(Node)

private Node enq(final Node node) {
    //CAS"自旋",直到成功加入隊尾
    for (;;) {
        Node t = tail;
        if (t == null) { // 隊列爲空,建立一個空的標誌結點做爲head結點,並將tail也指向它。
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {//正常流程,放入隊尾
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

第一個if語句

else語句

線程3進來會執行以下代碼

那麼效果圖

acquireQueued(Node, int)
  OK,經過tryAcquire()和addWaiter(),該線程獲取資源失敗,已經被放入等待隊列尾部了。聰明的你馬上應該能想到該線程下一部該幹什麼了吧:進入等待狀態休息,直到其餘線程完全釋放資源後喚醒本身,本身再拿到資源,而後就能夠去幹本身想幹的事了。沒錯,就是這樣!是否是跟醫院排隊拿號有點類似~~acquireQueued()就是幹這件事:在等待隊列中排隊拿號(中間沒其它事幹能夠休息),直到拿到號後再返回。這個函數很是關鍵,仍是上源碼吧:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;//標記是否成功拿到資源
    try {
        boolean interrupted = false;//標記等待過程當中是否被中斷過

        //又是一個「自旋」!
        for (;;) {
            final Node p = node.predecessor();//拿到前驅
            //若是前驅是head,即該結點已成老二,那麼便有資格去嘗試獲取資源(多是老大釋放完資源喚醒本身的,固然也可能被interrupt了)。
            if (p == head && tryAcquire(arg)) {
                setHead(node);//拿到資源後,將head指向該結點。因此head所指的標杆結點,就是當前獲取到資源的那個結點或null。
                p.next = null; // setHead中node.prev已置爲null,此處再將head.next置爲null,就是爲了方便GC回收之前的head結點。也就意味着以前拿完資源的結點出隊了!
                failed = false; // 成功獲取資源
                return interrupted;//返回等待過程當中是否被中斷過
            }

            //若是本身能夠休息了,就經過park()進入waiting狀態,直到被unpark()。若是不可中斷的狀況下被中斷了,那麼會從park()中醒過來,發現拿不到資源,從而繼續進入park()等待。
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;//若是等待過程當中被中斷過,哪怕只有那麼一次,就將interrupted標記爲true
        }
    } finally {
        if (failed) // 若是等待過程當中沒有成功獲取資源(如timeout,或者可中斷的狀況下被中斷了),那麼取消結點在隊列中的等待。
            cancelAcquire(node);
    }
}

到這裏了,咱們先不急着總結acquireQueued()的函數流程,先看看shouldParkAfterFailedAcquire()和parkAndCheckInterrupt()具體幹些什麼。

shouldParkAfterFailedAcquire(Node, Node)

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;//拿到前驅的狀態
    if (ws == Node.SIGNAL)
        //若是已經告訴前驅拿完號後通知本身一下,那就能夠安心休息了
        return true;
    if (ws > 0) {
        /*
         * 若是前驅放棄了,那就一直往前找,直到找到最近一個正常等待的狀態,並排在它的後邊。
         * 注意:那些放棄的結點,因爲被本身「加塞」到它們前邊,它們至關於造成一個無引用鏈,稍後就會被保安大叔趕走了(GC回收)!
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
         //若是前驅正常,那就把前驅的狀態設置成SIGNAL,告訴它拿完號後通知本身一下。有可能失敗,人家說不定剛剛釋放完呢!
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

整個流程中,若是前驅結點的狀態不是SIGNAL,那麼本身就不能安心去休息,須要去找個安心的休息點,同時能夠再嘗試下看有沒有機會輪到本身拿號。

parkAndCheckInterrupt()

  若是線程找好安全休息點後,那就能夠安心去休息了。此方法就是讓線程去休息,真正進入等待狀態。

private final boolean parkAndCheckInterrupt() {
     LockSupport.park(this);//調用park()使線程進入waiting狀態
     return Thread.interrupted();//若是被喚醒,查看本身是否是被中斷的。
 }

好了,咱們能夠小結下了。

看了shouldParkAfterFailedAcquire()和parkAndCheckInterrupt(),如今讓咱們再回到acquireQueued(),總結下該函數的具體流程:

1.結點進入隊尾後,檢查狀態,找到安全休息點;
2.調用park()進入waiting狀態,等待unpark()或interrupt()喚醒本身;
3.被喚醒後,看本身是否是有資格能拿到號。若是拿到,head指向當前結點,並返回從入隊到拿到號的整個過程當中是否被中斷過;若是沒拿到,繼續流程1。
最後咱們再回到前面的acquire方法來總結下

public final void acquire(int arg) {
     if (!tryAcquire(arg) &&
         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
         selfInterrupt();
 }

總結下它的流程吧

1.調用自定義同步器的tryAcquire()嘗試直接去獲取資源,若是成功則直接返回;
2.沒成功,則addWaiter()將該線程加入等待隊列的尾部,並標記爲獨佔模式;
3.acquireQueued()使線程在等待隊列中休息,有機會時(輪到本身,會被unpark())會去嘗試獲取資源。獲取到資源後才返回。若是在整個等待過程當中被中斷過,則返回true,不然返回false。
4.若是線程在等待過程當中被中斷過,它是不響應的。只是獲取資源後纔再進行自我中斷selfInterrupt(),將中斷補上。

Lock.unlock()

  好了,lock方法看完後,咱們再來看下unlock方法

release(int)

  它會釋放指定量的資源,若是完全釋放了(即state=0),它會喚醒等待隊列裏的其餘線程來獲取資源。這也正是unlock()的語義,固然不只僅只限於unlock()

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;//找到頭結點
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);//喚醒等待隊列裏的下一個線程
        return true;
    }
    return false;
}

tryRelease(int)

  此方法嘗試去釋放指定量的資源。下面是tryRelease()的源碼:

public final boolean release(int arg) {
        if (tryRelease(arg)) {//這裏是先嚐試釋放一下資源,通常均可以釋放成功,除了屢次重入但只釋放一次的狀況。
            Node h = head;
            //這裏判斷的是 阻塞隊列是否還存在和head節點是不是tail節點,由於以前說過,隊列的尾節點的waitStatus是爲0的
            if (h != null && h.waitStatus != 0)
                //到這裏就說明head節點已經釋放成功啦,就先去叫醒後面的直接節點去搶資源吧
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
private void unparkSuccessor(Node node) {
    //這裏,node通常爲當前線程所在的結點。
    int ws = node.waitStatus;
    if (ws < 0)//置零當前線程所在的結點狀態,容許失敗。
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;//找到下一個須要喚醒的結點s
    if (s == null || s.waitStatus > 0) {//若是爲空或已取消
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev) // 從後向前找。
            if (t.waitStatus <= 0)//從這裏能夠看出,<=0的結點,都是還有效的結點。
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);//喚醒
}

這個函數並不複雜。一句話歸納:用unpark()喚醒等待隊列中最前邊的那個未放棄線程,這裏咱們也用s來表示吧。此時,再和acquireQueued()聯繫起來,s被喚醒後,進入if (p == head && tryAcquire(arg))的判斷(即便p!=head也不要緊,它會再進入shouldParkAfterFailedAcquire()尋找一個安全點。這裏既然s已是等待隊列中最前邊的那個未放棄線程了,那麼經過shouldParkAfterFailedAcquire()的調整,s也必然會跑到head的next結點,下一次自旋p==head就成立啦),而後s把本身設置成head標杆結點,表示本身已經獲取到資源了,acquire()也返回了

  好了,到這咱們就由於把源碼看完了,再回頭來看下這張圖

是否是就清楚了AQS究竟是怎麼實現的咱們上面的猜測的了吧。那麼對應的下課後讓你本身去看
這幾個的源碼,你是否是就應該能看懂了,好了本文就介紹到此,本文對你有幫助的歡迎關注點贊,謝謝

相關文章
相關標籤/搜索