java併發編程系列:牛逼的AQS(下)

標籤: 「咱們都是小青蛙」公衆號文章java

Condition

ReentrantLock的內部實現

看完了AQS中的底層同步機制,咱們來簡單分析一下以前介紹過的ReentrantLock的實現原理。先回顧一下這個顯式鎖的典型使用方式:程序員

Lock lock = new ReentrantLock();
lock.lock();
try {
    加鎖後的代碼
} finally {
    lock.unlock();     
}
複製代碼

ReentrantLock首先是一個顯式鎖,它實現了Lock接口。可能你已經忘記了Lock接口長啥樣了,咱們再回顧一遍:bash

public interface Lock {
    void lock();
    void lockInterruptibly() throws InterruptedException;
    boolean tryLock();
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    void unlock();
    Condition newCondition();
}
複製代碼

其實ReentrantLock內部定義了一個AQS的子類來輔助它實現鎖的功能,因爲ReentrantLock是工做在獨佔模式下的,因此它的lock方法實際上是調用AQS對象的aquire方法去獲取同步狀態,unlock方法實際上是調用AQS對象的release方法去釋放同步狀態,這些你們已經很熟了,就再也不贅述了,咱們大體看一下ReentrantLock的代碼:多線程

public class ReentrantLock implements Lock {

    private final Sync sync;    //AQS子類對象
    
    abstract static class Sync extends AbstractQueuedSynchronizer { 
        // ... 爲節省篇幅,省略其餘內容
    }
    
    // ... 爲節省篇幅,省略其餘內容
}
複製代碼

因此若是咱們簡簡單單寫下下邊這行代碼:工具

Lock lock = new ReentrantLock();
複製代碼

就意味着在內存裏建立了一個ReentrantLock對象,一個AQS對象,在AQS對象裏維護着同步隊列head節點和tail節點,不過初始狀態下因爲沒有線程去競爭鎖,因此同步隊列是空的,畫成圖就是這樣:學習

image_1c3hf30h3bmodidrvogfe11oh2q.png-16.3kB

Condition的提出

咱們前邊嘮叨線程間通訊的時候提到過內置鎖的wait/notify機制,等待線程的典型的代碼以下:優化

synchronized (對象) {
    處理邏輯(可選)
    while(條件不知足) {
        對象.wait();
    }
    處理邏輯(可選)
}
複製代碼

通知線程的典型的代碼以下:ui

synchronized (對象) {
    完成條件
    對象.notifyAll();、
}
複製代碼

也就是當一個線程由於某個條件不能知足時就能夠在持有鎖的狀況下調用該鎖對象的wait方法,以後該線程會釋放鎖並進入到與該鎖對象關聯的等待隊列中等待;若是某個線程完成了該等待條件,那麼在持有相同鎖的狀況下調用該鎖的notify或者notifyAll方法喚醒在與該鎖對象關聯的等待隊列中等待的線程。this

顯式鎖的本質實際上是經過AQS對象獲取和釋放同步狀態,而內置鎖的實現是被封裝在java虛擬機裏的,咱們並無講過,這二者的實現是不同的。而wait/notify機制只適用於內置鎖,在顯式鎖裏須要另外定義一套相似的機制,在咱們定義這個機制的時候須要整清楚:在獲取鎖的線程由於某個條件不知足時,應該進入哪一個等待隊列,在何時釋放鎖,若是某個線程完成了該等待條件,那麼在持有相同鎖的狀況下怎麼從相應的等待隊列中將等待的線程從隊列中移出spa

爲了定義這個等待隊列,設計java的大叔們在AQS中添加了一個名叫ConditionObject的成員內部類:

public abstract class AbstractQueuedSynchronizer {
    
    public class ConditionObject implements Condition, java.io.Serializable {
        private transient Node firstWaiter;
        private transient Node lastWaiter;

        // ... 爲省略篇幅,省略其餘方法
    }
}
複製代碼

很顯然,這個ConditionObject維護了一個隊列,firstWaiter是隊列的頭節點引用,lastWaiter是隊列的尾節點引用。可是節點類是Node?對,你沒看錯,就是咱們前邊分析的同步隊列裏用到的AQS的靜態內部類Node,怕你忘了,再把這個Node節點類的主要內容寫一遍:

static final class Node {
    volatile int waitStatus;
    volatile Node prev;
    volatile Node next;
    volatile Thread thread;
    Node nextWaiter;
    
    static final int CANCELLED =  1;
    static final int SIGNAL    = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;
}
複製代碼

也就是說:AQS中的同步隊列和自定義的等待隊列使用的節點類是同一個

又因爲在等待隊列中的線程被喚醒的時候須要從新獲取鎖,也就是從新獲取同步狀態,因此該等待隊列必須知道線程是在持有哪一個鎖的時候開始等待的。設計java的大叔們在Lock接口中提供了這麼一個經過鎖來獲取等待隊列的方法:

Condition newCondition();
複製代碼

咱們上邊介紹的ConditionObject就實現了Condition接口,看一下ReentrantLock鎖是怎麼獲取與它相關的等待隊列的:

public class ReentrantLock implements Lock {

    private final Sync sync;
    
    abstract static class Sync extends AbstractQueuedSynchronizer {
        final ConditionObject newCondition() {
            return new ConditionObject();
        }
        // ... 爲節省篇幅,省略其餘方法
    }
    
    public Condition newCondition() {
        return sync.newCondition();
    }
    
    // ... 爲節省篇幅,省略其餘方法
}
複製代碼

能夠看到,其實就是簡單建立了一個ConditionObject對象而已~ 因爲 ConditionObject 是AQS 的成員內部類,因此在建立的 ConditionObject 對象中持有 AQS 對象的引用,因此經過 ConditionObject 對象訪問到 同步隊列,也就是能夠從新獲取同步狀態,也就是從新獲取鎖 。用文字描述仍是有些繞,咱們先經過鎖來建立一個Condition對象:

Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
複製代碼

因爲在初始狀態下,沒有線程去競爭鎖,因此同步隊列是空的,也沒有線程因某個條件不成立而進入等待隊列,因此等待隊列也是空的,ReentrantLock對象、AQS對象以及等待隊列在內存中的表示就如圖:

image_1c3hji5a4uvn1sdj1ro33hm87m61.png-26.7kB

固然,這個newCondition方法能夠反覆調用,從而能夠經過一個鎖來生成多個等待隊列

Lock lock = new ReentrantLock();
Condition condition1 = lock.newCondition();
Condition condition2 = lock.newCondition();
複製代碼

那接下來須要考慮怎麼把線程包裝成Node節點放到等待隊列的以及怎麼從等待隊列中移出了。ConditionObject成員內部類實現了一個Condition的接口,這個接口提供了下邊這些方法:

public interface Condition {
    void await() throws InterruptedException;
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    boolean awaitUntil(Date deadline) throws InterruptedException;
    void awaitUninterruptibly();
    void signal();
    void signalAll();
}
複製代碼

來看一下這些方法的具體意思:

方法名 描述
void await() 當前線程進入等待狀態,直到被通知(調用signal或者signalAll方法)或中斷
boolean await(long time, TimeUnit unit) 當前線程在指定時間內進入等待狀態,若是超出指定時間或者在等待狀態中被通知或中斷則返回
long awaitNanos(long nanosTimeout) 與上個方法相同,只不過默認使用的時間單位爲納秒
boolean awaitUntil(Date deadline) 當前線程進入等待狀態,若是到達最後期限或者在等待狀態中被通知或中斷則返回
void awaitUninterruptibly() 當前線程進入等待狀態,直到在等待狀態中被通知,須要注意的時,本方法並不相應中斷
void signal() 喚醒一個等待線程。
void signalAll() 喚醒全部等待線程。

能夠看到,Condition中的await方法和內置鎖對象的wait方法的做用是同樣的,都會使當前線程進入等待狀態,signal方法和內置鎖對象的notify方法的做用是同樣的,都會喚醒在等待隊列中的線程。

像調用內置鎖的wait/notify方法時,線程須要首先獲取該鎖同樣,調用Condition對象的await/siganl方法的線程須要首先得到產生該Condition對象的顯式鎖。它的基本使用方式就是:經過顯式鎖的 newCondition 方法產生Condition對象,線程在持有該顯式鎖的狀況下能夠調用生成的Condition對象的 await/signal 方法,通常用法以下:

Lock lock = new ReentrantLock();

Condition condition = lock.newCondition();

//等待線程的典型模式
public void conditionAWait() throws InterruptedException {
    lock.lock();    //獲取鎖
    try {
        while (條件不知足) {
            condition.await();  //使線程處於等待狀態
        }
        條件知足後執行的代碼;
    } finally {
        lock.unlock();    //釋放鎖
    }
}

//通知線程的典型模式
public void conditionSignal() throws InterruptedException {
    lock.lock();    //獲取鎖
    try {
        完成條件;
        condition.signalAll();  //喚醒處於等待狀態的線程
    } finally {
        lock.unlock();    //釋放鎖
    }
}
複製代碼

假設如今有一個鎖和兩個等待隊列:

Lock lock = new ReentrantLock();
Condition condition1 = lock.newCondition();
Condition condition2 = lock.newCondition();
複製代碼

畫圖表示出來就是:

image_1c3hjlt3n14rl1i9g1epg3371ce16e.png-39.7kB

有3個線程maint1t2同時調用ReentrantLock對象的lock方法去競爭鎖的話,只有線程main獲取到了鎖,因此會把線程t1t2包裝成Node節點插入同步隊列,因此ReentrantLock對象、AQS對象和同步隊列的示意圖就是這樣的:

image_1c3hjmnj819nl57f11l11p7f9196r.png-94.5kB

由於此時main線程是獲取到鎖處於運行中狀態,可是由於某個條件不知足,因此它選擇執行下邊的代碼來進入condition1等待隊列:

lock.lock();
try {
    contition1.await();
} finally {
    lock.unlock();
}
複製代碼

具體的await代碼咱們就不分析了,太長了,我怕你看的發睏,這裏只看這個await方法作了什麼事情:

  1. condition1等待隊列中建立一個Node節點,這個節點的thread值就是main線程,並且waitStatus-2,也就是靜態變量Node.CONDITION,表示表示節點在等待隊列中,因爲這個節點是表明線程main的,因此就把它叫作main節點把,新建立的節點長這樣:

    image_1c3hs5hvfu3g186m1g8m9tjdg9cg.png-14.1kB

  2. 將該節點插入condition1等待隊列中:

    image_1c3hs74l64m11n6eedc357acvct.png-118.9kB

  3. 由於main線程還持有者鎖,因此須要釋放鎖以後通知後邊等待獲取鎖的線程t,因此同步隊列裏的0號節點被刪除,線程t獲取鎖,節點1稱爲head節點,而且把thread字段設置爲null:

    image_1c3hs8phe1r1smrl12q41231hfuda.png-103.4kB

至此,main線程的等待操做就作完了,假如如今得到鎖的t1線程也執行下邊的代碼:

lock.lock();
try {
    contition1.await();
} finally {
    lock.unlock();
}
複製代碼

仍是會執行上邊的過程,把t1線程包裝成Node節點插入到condition1等待隊列中去,因爲原來在等待隊列中的節點1會被刪除,咱們把這個新插入等待隊列表明線程t1的節點稱爲新節點1吧:

image_1c3hshhsik77531ribb6kv57e4.png-112.2kB

這裏須要特別注意的是:同步隊列是一個雙向鏈表,prev表示前一個節點,next表示後一個節點,而等待隊列是一個單向鏈表,使用nextWaiter表示下一個節點,這是它們不一樣的地方

如今獲取到鎖的線程是t2,你們一塊兒出來混的,前兩個都進去,只剩下t2多很差呀,不過此次不放在condition1隊列後頭了,換成condition2隊列吧:

lock.lock();
try {
    contition2.await();
} finally {
    lock.unlock();
}
複製代碼

效果就是:

image_1c3hsjumr5jb3c5cqhdk57tieh.png-127.6kB

你們發現,雖然如今沒有線程獲取鎖,也沒有線程在鎖上等待,可是同步隊列裏仍舊有一個節點,是的,同步隊列只有初始時無任何線程由於鎖而阻塞的時候才爲空,只要曾經有線程由於獲取不到鎖而阻塞,這個隊列就不爲空了

至此,maint1t2這三個線程都進入到等待狀態了,都進去了誰把它們弄出來呢???額~ 好吧,再弄一個別的線程去獲取同一個鎖,比方說線程t3去把condition2條件隊列的線程去喚醒,能夠調用這個signal方法:

lock.lock();
try {
    contition2.signal();
} finally {
    lock.unlock();
}
複製代碼

由於在condition2等待隊列的線程只有t2,因此t2會被喚醒,這個過程分兩步進行:

  1. 將在condition2等待隊列的表明線程t2新節點2,從等待隊列中移出。

  2. 將移出的節點2放在同步隊列中等待獲取鎖,同時更改該節點的waitStauts0

這個過程的圖示以下:

image_1c3hsv52u8i64rgsdo2t1lngeu.png-119.3kB

若是線程t3繼續調用signalAllcondition1等待隊列中的線程給喚醒也是差很少的意思,只不過會把condition1上的兩個節點同時都移動到同步隊列裏:

lock.lock();
try {
    contition1.signalAll();
} finally {
    lock.unlock();
}
複製代碼

效果如圖:

image_1c3hthb14i21a58i5168p1a1bfb.png-98.9kB

這樣所有線程都從等待狀態中恢復了過來,能夠從新競爭鎖進行下一步操做了。

以上就是Condition機制的原理和用法,它實際上是內置鎖的wait/notify機制在顯式鎖中的另外一種實現,不過原來的一個內置鎖對象只能對應一個等待隊列,如今一個顯式鎖能夠產生若干個等待隊列,咱們能夠根據線程的不一樣等待條件來把線程放到不一樣的等待隊列上去Condition機制的用途能夠參考wait/notify機制,咱們接下來把以前用內置鎖和wait/notify機制編寫的同步隊列BlockedQueue顯式鎖 + Condition的方式來該寫一下:

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionBlockedQueue<E> {

    private Lock lock = new ReentrantLock();

    private Condition notEmptyCondition = lock.newCondition();

    private Condition notFullCondition = lock.newCondition();

    private Queue<E> queue = new LinkedList<>();

    private int limit;

    public ConditionBlockedQueue(int limit) {
        this.limit = limit;
    }

    public int size() {
        lock.lock();
        try {
            return queue.size();
        } finally {
            lock.unlock();
        }
    }

    public boolean add(E e) throws InterruptedException {
        lock.lock();
        try {
            while (size() >= limit) {
                notFullCondition.await();
            }

            boolean result = queue.add(e);
            notEmptyCondition.signal();
            return result;
        } finally {
            lock.unlock();
        }
    }

    public E remove() throws InterruptedException{
        lock.lock();
        try {
            while (size() == 0) {
                notEmptyCondition.await();
            }
            E e = queue.remove();
            notFullCondition.signalAll();
            return e;
        } finally {
            lock.unlock();
        }
    }
}
複製代碼

在這個隊列裏邊咱們用了一個ReentrantLock鎖,經過這個鎖生成了兩個Condition對象,notFullCondition表示隊列未滿的條件,notEmptyCondition表示隊列未空的條件。當隊列已滿的時候,線程會在notFullCondition上等待,每插入一個元素,會通知在notEmptyCondition條件上等待的線程;當隊列已空的時候,線程會在notEmptyCondition上等待,每移除一個元素,會通知在notFullCondition條件上等待的線程。這樣語義就變得很明顯了。若是你有更多的等待條件,你能夠經過顯式鎖生成更多的Condition對象。而每一個內置鎖對象都只能有一個相關聯的等待隊列,這也是顯式鎖對內置鎖的優點之一

咱們總結一下上邊的用法:每一個顯式鎖對象又能夠產生若干個Condition對象,每一個Condition對象都會對應一個等待隊列,因此就起到了一個顯式鎖對應多個等待隊列的效果

AQS中其餘針對等待隊列的重要方法

除了Condition對象的awaitsignal方法,AQS還提供了許多直接訪問這個隊列的方法,它們由都是public final修飾的:

public abstract class AbstractQueuedSynchronizer {
    public final boolean owns(ConditionObject condition) public final boolean hasWaiters(ConditionObject condition) {}
     public final int getWaitQueueLength(ConditionObject condition) {}
     public final Collection<Thread> getWaitingThreads(ConditionObject condition) {}
}
複製代碼
方法名 描述
owns 查詢是否經過本AQS對象生成的指定的 ConditionObject對象
hasWaiters 指定的等待隊列裏是否有等待線程
getWaitQueueLength 返回正在等待此條件的線程數估計值。由於在構造該結果時,多線程環境下實際線程集合可能發生大的變化
getWaitingThreads 返回正在等待此條件的線程集合的估計值。由於在構造該結果時,多線程環境下實際線程集合可能發生大的變化

若是有須要的話,能夠在咱們自定義的同步工具中使用它們。

題外話

寫文章挺累的,有時候你以爲閱讀挺流暢的,那實際上是背後無數次修改的結果。若是你以爲不錯請幫忙轉發一下,萬分感謝~ 這裏是個人公衆號「咱們都是小青蛙」,裏邊有更多技術乾貨,時不時扯一下犢子,歡迎關注:

小冊

另外,做者還寫了一本MySQL小冊:《MySQL是怎樣運行的:從根兒上理解MySQL》的連接 。小冊的內容主要是從小白的角度出發,用比較通俗的語言講解關於MySQL進階的一些核心概念,好比記錄、索引、頁面、表空間、查詢優化、事務和鎖等,總共的字數大約是三四十萬字,配有上百幅原創插圖。主要是想下降普通程序員學習MySQL進階的難度,讓學習曲線更平滑一點~ 有在MySQL進階方面有疑惑的同窗能夠看一下:

相關文章
相關標籤/搜索