AQS詳解,併發編程的半壁江山

千呼萬喚始出來,終於寫到AQS這個一章了,其實爲了寫這一章,前面也是作了不少的鋪墊,好比以前的html

深度理解volatile關鍵字 線程之間的協做(等待通知模式) JUC 經常使用4大併發工具類 CAS 原子操做 顯示鎖 瞭解LockSupport工具類java

這些文章其實都是爲了讓你們理解AQS而寫的鋪墊,就像吃東西須要一口一口的吃同樣編程

AQS概述及其實現類:

  AQS,是AbstractQuenedSynchronizer的縮寫,中文名稱爲抽象的隊列式同步器,是java併發編程這一塊的半壁江山,這個類存在於在java.util.concurrent.locks包,AQS定義了一套多線程訪問共享資源的同步器框架,許多同步類實現都依賴於它,好比以前寫的顯示鎖ReentrantLock,,讀寫鎖ReentrantReadWriteLock,JUC的四大併發工具類中的Semaphore,CountDownLatch,線程池暫時還沒寫以後再寫設計模式

 

 

 在JDK1.7以前,FutureTask,應該也是繼承了AQS來實現的,可是1.8以後就改變了安全

 

 

 可是實現思想應該沒有太大改變,,因此說AQS是併發編程的半壁江山網絡

 

核心思想:

  若是被請求的共享資源空閒,則將當前請求資源的線程設置爲有效的工做線程,並將共享資源設置爲鎖定狀態,若是被請求的共享資源被佔用,那麼就須要一套線程阻塞等待以及被喚醒時鎖分配的機制,這個機制AQS是用CLH隊列鎖實現的,即將暫時獲取不到鎖的線程加入到隊列中。數據結構

CLH(Craig,Landin,and Hagersten)隊列是一個虛擬的雙向隊列,虛擬的雙向隊列即不存在隊列實例,僅存在節點之間的關聯關係。
AQS是將每一條請求共享資源的線程封裝成一個CLH鎖隊列的一個結點(Node),來實現鎖的分配。多線程

其實在我理解來講,AQS就是基於CLH隊列,用volatile修飾共享變量state,來保證變量的可見性,線程經過CAS去改變狀態符,保證狀態的原子性,成功則獲取鎖成功,失敗則進入等待隊列,等待被喚醒。併發

注意:AQS是自旋鎖:在等待喚醒的時候,常常會使用自旋(while(!cas()))的方式,不停地嘗試獲取鎖,直到被其餘線程獲取成功框架

 

框架:

 

 

經過這個圖得知,AQS維護了一個volatile int state和一個FIFO線程等待隊列,多線程爭用資源被阻塞的時候就會進入這個隊列。state就是共享資源,其訪問方式有以下三種:
getState();setState();compareAndSetState();

AQS 定義了兩種資源共享方式:
1.Exclusive:獨佔,只有一個線程能執行,如ReentrantLock
2.Share:共享,多個線程能夠同時執行,如Semaphore、CountDownLatch、ReadWriteLock,CyclicBarrier

不一樣的自定義的同步器爭用共享資源的方式也不一樣。

AQS底層使用了模板方法模式

同步器的設計是基於模板方法模式的,若是不瞭解的能夠去看看模板方法設計模式,以前在寫設計模式的六大設計原則的時候也說了,看看設計模式有助於理解源碼,若是須要自定義同步器通常的方式是這樣:

  1. 使用者繼承AbstractQueuedSynchronizer並重寫指定的方法。
  2. 將AQS組合在自定義同步組件的實現中,並調用其模板方法,而這些模板方法會調用使用者重寫的方法,就相似於我定義了一個骨架,你填充東西同樣

自定義同步器在實現的時候只須要實現共享資源state的獲取和釋放方式便可,至於具體線程等待隊列的維護,AQS已經在頂層實現好了。自定義同步器實現的時候主要實現下面幾種方法:

  • isHeldExclusively():該線程是否正在獨佔資源。只有用到condition才須要去實現它。
  • tryAcquire(int):獨佔方式。嘗試獲取資源,成功則返回true,失敗則返回false。
  • tryRelease(int):獨佔方式。嘗試釋放資源,成功則返回true,失敗則返回false。
  • tryAcquireShared(int):共享方式。嘗試獲取資源。負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。
  • tryReleaseShared(int):共享方式。嘗試釋放資源,若是釋放後容許喚醒後續等待結點返回true,不然返回false。

  以ReentrantLock爲例,state初始化爲0,表示未鎖定狀態。A線程lock()時,會調用tryAcquire()獨佔該鎖並將state+1。此後,其餘線程再tryAcquire()時就會失敗,直到A線程unlock()到state=0(即釋放鎖)爲止,其它線程纔有機會獲取該鎖。固然,釋放鎖以前,A線程本身是能夠重複獲取此鎖的(state會累加),這就是可重入的概念。但要注意,獲取多少次就要釋放多麼次,這樣才能保證state是能回到零態的。

  再以CountDownLatch以例,任務分爲N個子線程去執行,state也初始化爲N(注意N要與線程個數一致)。這N個子線程是並行執行的,每一個子線程執行完後countDown()一次,state會CAS減1。等到全部子線程都執行完後(即state=0),會unpark()主調用線程,而後主調用線程就會從await()函數返回,繼續後餘動做。

  通常來講,自定義同步器要麼是獨佔方法,要麼是共享方式,他們也只需實現tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一種便可。但AQS也支持自定義同步器同時實現獨佔和共享兩種方式,如ReentrantReadWriteLock。

  在acquire() acquireShared()兩種方式下,線程在等待隊列中都是忽略中斷的,acquireInterruptibly()/acquireSharedInterruptibly()是支持響應中斷的。

繼承AQS,手寫獨佔式可重入鎖:

  說了那麼多,可是說一千道一萬不如本身手寫試試,接下來看代碼

package org.dance.day4.aqs;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * 採用主類實現Lock接口,內部類繼承AQS,封裝細節
 * 自定義鎖
 * @author ZYGisComputer
 */
public class CustomerLock implements Lock {

    private final Sync sync = new Sync();

    /**
     * 採用內部類來繼承AQS,封裝細節
     *  實現獨佔鎖,經過控制state狀態開表示鎖的狀態
     *      state:1 表明鎖已被佔用
     *      state:0 表明鎖能夠被佔用
     */
    private static class Sync extends AbstractQueuedSynchronizer{
        @Override
        protected boolean tryAcquire(int arg) {
            if(compareAndSetState(0,1)){
                // 當前線程獲取到鎖
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }else{
                return false;
            }
        }

        @Override
        protected boolean tryRelease(int arg) {
            // 若是狀態爲沒人佔用,還去釋放,就報錯
            if(getState()==0){
                throw new UnsupportedOperationException();
            }
            // 把鎖的佔用者制空
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        /**
         * 判斷線程是否佔用資源
         * @return
         */
        @Override
        protected boolean isHeldExclusively() {
            return getState()==1;
        }

        /**
         * 獲取Condition接口
         * @return
         */
        public Condition getCondition(){
            return new ConditionObject();
        }
    }

    @Override
    public void lock() {
        sync.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1,unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.release(1);
    }

    @Override
    public Condition newCondition() {
        return sync.getCondition();
    }
}

工具類:

package org.dance.tools;

import java.util.concurrent.TimeUnit;

/**
 * 類說明:線程休眠輔助工具類
 */
public class SleepTools {

    /**
     * 按秒休眠
     * @param seconds 秒數
     */
    public static final void second(int seconds) {
        try {
            TimeUnit.SECONDS.sleep(seconds);
        } catch (InterruptedException e) {
        }
    }

    /**
     * 按毫秒數休眠
     * @param seconds 毫秒數
     */
    public static final void ms(int seconds) {
        try {
            TimeUnit.MILLISECONDS.sleep(seconds);
        } catch (InterruptedException e) {
        }
    }
}

測試類:

package org.dance.day4.aqs;


import org.dance.tools.SleepTools;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 *類說明:測試手寫鎖
 */
public class TestMyLock {
    public static void main(String[] args) {
        TestMyLock testMyLock = new TestMyLock();
        testMyLock.test();
    }
    public void test() {
        // 先使用ReentrantLock 而後替換爲咱們本身的Lock
        final Lock lock = new ReentrantLock();

        class Worker extends Thread {
            @Override
            public void run() {
                while (true) {
                    lock.lock();
                    try {
                        SleepTools.second(1);
                        System.out.println(Thread.currentThread().getName());
                        SleepTools.second(1);
                    } finally {
                        lock.unlock();
                    }
                    SleepTools.second(2);
                }
            }
        }
        // 啓動10個子線程
        for (int i = 0; i < 10; i++) {
            Worker w = new Worker();
            w.setDaemon(true);
            w.start();
        }
        // 主線程每隔1秒換行
        for (int i = 0; i < 10; i++) {
            SleepTools.second(1);
            System.out.println();
        }
    }
}

執行結果:

Thread-0



Thread-1

Thread-2



Thread-3

Thread-4

經過結果能夠看出來每次都是隻有一個線程在執行的,線程的鎖獲取沒有問題,接下來換咱們本身的鎖

final Lock lock = new CustomerLock();

再次執行測試

執行結果:

Thread-0


Thread-1


Thread-2


Thread-3


Thread-4

因而可知,這個手寫的鎖,和ReentrantLock是同樣的效果,是否是感受也挺簡單的,也沒有多少行代碼

那麼獨佔鎖,被一個線程佔用着,其餘線程去了哪裏?不要走開接下來進入AQS的源碼看看

 

理論:

 

 

 

 在AQS中的數據結構是採用同步器+一個雙向循環鏈表的數據結構,來存儲等待的節點的,由於雙向鏈表是沒有頭的,可是爲了保證喚醒的操做,同步器中的head標誌了鏈表中的一個節點爲頭節點,也就是將要喚醒的,也標識了一個尾節點

 

 

結點狀態waitStatus,須要保證可見性,用volatile修飾

      這裏咱們說下Node。Node結點是對每個等待獲取資源的線程的封裝,其包含了須要同步的線程自己及其等待狀態,如是否被阻塞、是否等待喚醒、是否已經被取消等。變量waitStatus則表示當前Node結點的等待狀態,共有5種取值CANCELLED、SIGNAL、CONDITION、PROPAGATE、0。

  • CANCELLED(1):表示當前結點已取消調度。當timeout或被中斷(響應中斷的狀況下),會觸發變動爲此狀態,進入該狀態後的結點將不會再變化。

  • SIGNAL(-1):表示後繼結點在等待當前結點喚醒。後繼結點入隊時,會將前繼結點的狀態更新爲SIGNAL。

  • CONDITION(-2):表示結點等待在Condition上,當其餘線程調用了Condition的signal()方法後,CONDITION狀態的結點將從等待隊列轉移到同步隊列中,等待獲取同步鎖。

  • PROPAGATE(-3):共享模式下,前繼結點不只會喚醒其後繼結點,同時也可能會喚醒後繼的後繼結點。

  • 0:新結點入隊時的默認狀態。

注意,負值表示結點處於有效等待狀態,而正值表示結點已被取消。因此源碼中不少地方用>0、<0來判斷結點的狀態是否正常

同步隊列中節點的增長和移除

 

 

 經過圖能夠看出來,在增長 尾節點的時候須要經過CAS設置,由於多是多個線程同時設置,可是移除首節點的時候是不須要的,由於這個操做是由同步器操做的,而且首節點只有一個

獨佔式同步狀態的獲取與釋放

 

 

 AQS的從線程的獲取同步狀態到,對同步隊列的維護,到釋放,的流程圖就是這樣的,有興趣看源碼的本身去跟一下,就是主要實現的模板方法,

注意:其實在這個給你們提個醒,看源碼的時候,找核心的看,找主要的看,不要一行一行的扣着看,沒有意義,還有就是調用過程複雜,體會核心流程就能夠

 

以前寫了<<Lock接口之Condition接口>>這一章,而後在這裏寫一下Condition接口在AQS裏面的實現吧,由於無論本身寫鎖也好,默認鎖的實現也好,用的Condition都是AQS默認寫好的

Condition實現分析:

 

 

 

 一個鎖是能夠有多個Condition的,每一個Condition都包含一個本身的等待隊列,不一樣於Object屬於同一個對象等待,他存在一個單鏈表結構的等待隊列,清晰的知道要喚醒本身的等待隊列中的節點,因此採用signal方法而不是signalall

固然採用的類仍是Node類固然單鏈表其實就是沒有上一個節點的引用而已

等待隊列和同步隊列採用的是相同的類,只不過是實現的數據機構確是不同的而已

最終一個鎖的實例化會成爲上圖中第二個圖的這種形式,Demo也就是以前寫的<<Lock接口之Condition接口>>中的用的鎖最終造成的結構及時就是維持了一個同步隊列和兩個等待隊列,鎖用於控制併發,而兩個隊列用於控制地點變化和千米數變化的不一樣的等待通知模式

節點在隊列中的移動

 

 

 就是在當前線程await的時候從同步隊列移除後加入到等待隊列尾部,而喚醒就是從等待隊列移除後加入到同步隊列尾部,兩個隊列相互轉換的過程,之因此採用同一個類,就是爲了方便的在不一樣隊列中相互轉化

固然這也是爲何不推薦使用SignalAll方法的緣由,由於若是一個等待隊列中有不少的線程在等待,所有喚醒後,最多且只能有一個線程獲取到同步狀態,其餘線程所有要被加入到同步隊列的末尾,並且也可能當前的同步狀態被別人持有,一個線程也獲取不到,所有都要被加入同步隊列中,因此不推薦使用SignalAll,推薦是用Signal

其實也能夠想象,好比wait和notify/notifyAll 在寫<<線程之間的協做(等待通知模式)>>這篇文章的時候的最後一個問題也能夠大概想象一下,應該也是維持了一個同步隊列,可是等待隊列應該是隻有一個,因此,被喚醒的是第一個等待的節點,可是它沒有辦法保證要被喚醒的節點必定是在頭一個,只能喚醒所有的節點,來保證須要喚醒的線程必定被喚醒,大概也是這樣的一個節點的移動,根據網絡文章的描述,應該八九不離十

根據猜想,結合上方的Condition接口分析,因此說,在wait,notify/notifyAll中推薦使用notifyAll,防止第一個節點不是須要喚醒的節點,形成喚醒錯誤,可是Condition是知道的,被喚醒的必定是須要喚醒的,不會喚醒錯誤,因此說,推薦使用signal

能看到這裏的證實你真的很愛這個行業,你是最棒的!加油

回顧Lock的實現

ReentrantLock

其實在上面手寫的鎖,是有一些缺陷的,由於判斷的是否是等於1,因此他是一個不支持可重入的,一旦重入,就會形成死鎖,本身鎖住本身,可是ReentrantLock就不會

他支持鎖的可重入,而且支持鎖的公平和非公平

 

 經過源碼能夠看到,他是經過狀態的累加完成的鎖的可重入,固然前提是已經拿到鎖的線程,會有這樣一個判斷

 

 因此可想而知,釋放的時候,每次釋放就遞減,最終等於0的時候完成鎖的釋放

 

 在實現公平鎖的時候,就是判斷當前節點是否有前期節點,是否是第一個,若是有,不是第一個,抱歉你不能搶鎖

 

 可想而知在非公平鎖中就是不判斷而已

由於不須要判斷,而且是誰搶到鎖,鎖就是誰的,因此說非公平鎖比公平鎖效率高

 

ReentrantReadWriteLock

在讀寫鎖中,一個狀態如何 保存兩個狀態呢?採用位數分割

 

 應該有知道 int是32位的,他把32位一分爲二,採用低位保存寫的狀態,高位保存讀的狀態

寫鎖,應該都知道,只能同時被一個線程持有,因此重入的話,也比較好保存

可是讀鎖不同,能夠被多個線程同時持有,是共享鎖,而且重入的次數是不同的,那麼該則麼保存呢?採用高位只保存被多少線程持有

 

 採用每一個持有鎖的線程中的一個HoldCounter對象保存,使用ThreadLocalHoldCounter繼承ThreadLocal來保存線程變量,區別不一樣線程

讀寫鎖的升級和降級

讀寫鎖支持寫鎖降級爲讀鎖,可是不支持讀鎖升級爲寫鎖,爲了保證線程安全和數據可見性,由於在寫鎖執行期間,讀鎖是被阻塞的,因此說寫鎖降級爲讀鎖是沒有問題的,可是若是是讀鎖升級爲寫鎖,在其餘線程使用完寫鎖的時候,讀鎖是看不見的,爲了保證線程安全,因此不支持讀鎖升級成寫鎖

 

到此AQS就寫完了,由於AQS涉及的知識太多,能看到如今的也都是大神了,恭喜大家,掌握了併發編程的半壁江上,爲了本身的夢想更近了一步,加油,由於知識點多,因此你們多看幾遍,不理解的能夠百度,也能夠評論區提問

 

做者:彼岸舞

時間:2020\11\18

內容關於:併發編程

本文來源於網絡,只作技術分享,一律不負任何責任

相關文章
相關標籤/搜索