連肝4天,這瞬間戳中面試官當心心的AQS大餐,給你們安排上!

點讚的靚仔,你時人羣中最閃耀的光芒

前言

AQS,英文全稱AbstractQueuedSynchronizer,直接翻譯爲抽象的隊列同步器。是JDK1.5出現的一個用於解決併發問題的工具類,由大名鼎鼎的Doug Lea打造,與synchornized關鍵字不一樣的是,AQS是經過代碼解決併發問題。html

回顧併發問題

併發問題是指在多線程運行環境下,共享資源安全的問題。
如今的銀行帳戶,經過銀行卡和手機銀行均可以操做帳戶, 若是咱們同時拿着銀行卡和存摺去銀行搞事情,會怎麼樣呢?java

package demo.pattren.aqs;

public class Money {
    /**
     * 假設如今帳戶有1000塊錢
     */
    private int money = 1000;
    /**
     * 取錢
     */
    public void drawMoney(){
        this.money--;
    }
    public static void main(String[] args) throws InterruptedException {
        Money money = new Money();
        for(int i=0; i<1000; i++){
            new Thread(() -> {
                try {
                    Thread.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                money.drawMoney();
            },i + "").start();
        }
        Thread.sleep(2000);
        System.out.println("當前帳戶餘額:" + money.money);
    }
}


這樣想着是否是立刻能夠去銀行搞一波事情? 哈哈,你想太多了,若是能這樣搞,銀行早破產了。咱們主要是來分析一下出現這個問題的緣由,JVM內存是JMM結構的,每一個線程操做的數據是從主內存中複製的一個和備份,而多個線程就會存在多個備份,當線程中的備份數據被修改時,會將值刷新到主內存,好比多個線程同時獲取到了帳戶的餘額爲500元,A線程存錢100,線程A將600刷新到主內存,$\color{red}{主內存並不會主動通知其餘線程此時值已經被修改}$,因此主內存的值此時與其餘線程的值是不一樣的,若是其餘線程再操做帳戶餘額,是在500的基礎上進行的,這顯然不是咱們想要的結果。
node

解決併發問題

JDK提供了多種解決多線程安全的方式。安全

volatile關鍵字

volatile是JDK提供的關鍵字,用來修飾變量,volatile修飾的變量可以保證多個線程下的可見性,如上個案例,A修改了帳戶的餘額,而後將最新的值刷新到主內存,此時主內存會將最新的值同步到其餘線程。

volatile解決了多線程下數據讀取一致的問題,$\color{red}{即保證可見性,可是其並不能保證寫操做的原子性}$,

當多個線程同時寫操做的時候,即多個線程同時去將線程中最新的值刷新到主內存,將會出現問題。

經過volatile關鍵字修飾money變量,發下並不能解決線程安全問題。多線程

原子操做類

原子操做類是JDK提供的一系列保證原子操做的工具類,原子類能夠保證多線程環境下對其值的操做是安全的。併發

package demo.pattren.aqs;

import java.util.concurrent.atomic.AtomicInteger;

public class AtomicMoney {
    /**
     * 假設如今帳戶有1000塊錢
     */
    private AtomicInteger money = new AtomicInteger(1000);
    /**
     * 取錢
     */
    public void drawMoney(){
        //AtomicInteger的自減操做
        this.money.getAndDecrement();
    }
    public static void main(String[] args) throws InterruptedException {
        AtomicMoney money = new AtomicMoney();
        for(int i=0; i<1000; i++){
            new Thread(() -> {
                try {
                    Thread.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                money.drawMoney();
            },i + "").start();
        }
        Thread.sleep(2000);
        System.out.println("當前帳戶餘額:" + money.money);
    }
}


屢次測試結果都是0,與預期一致。原子操做類是使用CAS(Compare and swap 比較並替換)的機制來保證操做的原子性,相對於鎖,他的併發性更高。jvm

synchronized關鍵字

synchronized關鍵字是jvm層面來保證線程安全的,經過在代碼塊先後添加monitorenter與monitorexit命令來保證線程的安全,並且在JDK1.6對synchronized關鍵字作了較大的優化,性能有了較大的提高。能夠肯定的是,經過synchronized確定能夠保證線程安全,因此使用synchronized也是很好的選擇,固然synchronized鎖的升級不可逆特徵,致使在高併發下性能是不能很好的保證。高併發

Lock鎖

終於迎來了本篇文章的主角,前面的內容,其實與文章的主題AQS並無直接的關聯,就簡單帶過。前面不少都是JVM層面來保證線程安全的,而AQS則是徹底經過代碼層面來處理線程安全的。
(PS:小節標題明明是Lock鎖,怎麼寫AQS了,騙我讀書少)
工具

博主怕捱打,正在全力解釋中~。先上類圖壓場!

如上圖,左邊是抽象隊列同步器,而右邊則是使用隊列同步器實現的功能——鎖、信號量、發令槍等。
能夠先不看源碼,我們本身思考,要以純代碼的方式實現應當考慮哪些問題?性能

  1. 線程互斥:可使用state狀態進行判斷,state=0,則能夠獲取到鎖,state>0,則不能獲取。
  2. 排隊等候:不能獲取鎖的線程應當存儲起來,當鎖釋放後能夠繼續獲取鎖執行。
  3. 線程喚醒:當鎖釋放後,處於等待狀態的線程應當被喚醒。
  4. 鎖重入 : 如何解決同一個進入多個加鎖的方法(不解決的話分分鐘死鎖給你看)。

對於一、2兩點,難度應帶不大,而三、4兩點如何去設計呢?咱們經過僞代碼預演操做流程。

在業務端,是這樣操做的。

加鎖
  {須要被鎖住的代碼}
  釋放鎖

加鎖與釋放鎖的邏輯

if(state == 0)
      獲取到鎖
      set(state == 1)
    else
      繼續等待
      while(true){
           if(state == 0)
             再次嘗試獲取鎖
      }

這樣設計以後,整個操做流程再次變成了串行操做。

這和咱們去食堂排隊打飯是同樣的,食堂不可能爲每一個學生都開放一個窗口,因此多個學生就會爭搶有限的窗口,若是沒有必定的控制,那麼食堂每到吃飯的時候都是亂套的,一羣學生圍着窗口同時去打飯,想一想都是多麼的恐怖。而由此出現了排隊的機制,一個窗口同一時間打飯的人只能有一個,當前一我的離開窗口後,後面排隊的學生才能去打飯。

源碼解讀

下面咱們深刻JDK源碼,領略大師級的代碼設計。
業務調用代碼:

package demo.aqs;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class LockMoney {
    Lock lock = new ReentrantLock();
    /**
     * 假設如今帳戶有1000塊錢
     */
    private int money = 1000;
    //private int money = 1000;
    /**
     * 取錢
     */
    public void drawMoney(){
        lock.lock();
        this.money--;
        lock.unlock();
    }
    public static void main(String[] args) throws InterruptedException {
        LockMoney money = new LockMoney();
        for(int i=0; i<1000; i++){
            new Thread(() -> {
                try {
                    Thread.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                money.drawMoney();
            },i + "").start();
        }
        Thread.sleep(2000);
        System.out.println("當前帳戶餘額:" + money.money);
    }
}

追蹤Lock方法:
直接看源碼基本一下子就暈車,我嘗試繪製出lock方法的調用鏈路。而後結合源碼解釋。


你們跟着箭頭走一遍源碼,多多少少可以體會到AQS的實現機制。

NonfairSync.lock

final void lock() {
    //CAS嘗試將state從0更新爲1,更新成功則執行if下面的代碼。
    if (compareAndSetState(0, 1))
        //獲取鎖成功,執行線程執行
        setExclusiveOwnerThread(Thread.currentThread());
    else
        //獲取鎖失敗,線程入隊列
        acquire(1);
}

看到這段代碼,是否是瞬間明白前面提到的一、2兩點問題。首先compareAndSetState方法是使用Unsafe直接操做內存而且使用樂觀鎖的方式,可以保證有且僅有一個線程可以操做成功,是多線程安全的。即設置將state設置爲1成功的線程可以搶佔到鎖(線程互斥),而沒有設置成功的線程將進行入隊操做(排隊等候),這樣感受瞬間明朗了許多,那咱們接着往下看。

AbstractQueuedSynchronizor.acquire

public final void acquire(int arg) {
    //tryAcquire失敗而且acquireQueued成功,則調用selfInterrupt
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        //當線程獲取鎖失敗而且線程阻塞失敗會中斷線程
        selfInterrupt();
}

AbstractQueuedSynchronizor的tryAcquire方法,其最終調用到了Sync的nonfairTryAcquire

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    //獲取當前鎖的狀態值
    int c = getState();
    // state = 0,表示當前鎖爲空閒狀態,其實這一段代碼和前面lock的方法是同樣的
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //不等於0 則判斷當前線程是否爲持有鎖的線程,若是是則執行代碼,這裏解決了重入鎖問題
    else if (current == getExclusiveOwnerThread()) {
        //當前狀態值 + 1(能夠看前面的傳參)
        int nextc = c + acquires;
        // 囧, 這裏是超出了int的最大值纔會出現這樣的狀況
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        //更新state的值
        setState(nextc);
        return true;
    }
    return false;
}

經過閱讀源碼,能夠發現,tryAcquire方法在當前線程獲取鎖成功或者是重入鎖的狀況下返回true,不然返回false。而同時這個方法解決了上面提到的第4點鎖重入的問題。ok,感受愈來愈接近真相了,接着看addWaiter方法。
理解addWaiter方法的代碼,先看方法中用的得Node對象。 Node對象是對Thread對象的封裝,使其具備線程的功能,同時他還有prev、next等屬性。那麼很明瞭,Node是一個鏈表結構的對象

//前一個結點
   volatile Node prev;
   //下一個結點
   volatile Node next;

同時AbstractQueuedSynchronizor中包含head、tail屬性

//Node鏈表的頭結點
 private transient volatile Node head;
 //Node鏈表的尾結點
 private transient volatile Node tail;
private Node addWaiter(Node mode) {
    //將當前線程包裝爲Node對象
    Node node = new Node(Thread.currentThread(), mode);
    //獲取尾節點,當這段代碼第一次運行的時候,並無尾結點
    //因此確定值爲null,那麼會執行下面的enq方法
    Node pred = tail;
    //當再次運行代碼的時候,尾結點再也不爲null(enq方法初始化了尾結點,能夠先往下看enq方法源碼)
    if (pred != null) {
        //當前結點的前置結點指向以前的尾結點
        node.prev = pred;
        //CAS嘗試將尾結點從pred設置爲node
        if (compareAndSetTail(pred, node)) {
            //設置成功則將pred的next結點執行node
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

上面的解釋聽着有點繞腦殼。

不着急,咱們先看enq方法

private Node enq(final Node node) {
    //死循環
    for (;;) {
        //獲取尾結點
        Node t = tail;
        //尾結點爲空,則初始化尾結點和頭結點爲同一個新建立的Node對象
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            //將當前結點設爲爲尾結點,並將前一個尾結點的next指向當前結點
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                //退出循環
                return t;
            }
        }
    }
}

enq具體作了什麼事情呢:

  1. 第一次循環,初始化頭結點與尾結點 new Node()
  2. 第二次循環,將當前線程封裝的Node對象設置爲尾結點,並將前一個尾結點的next指向此Node

這裏須要一些時間 + 空間的想象力,但若是對鏈表結構比較熟悉的話,這裏理解也是不太困難的。
咱們動態的想想執行過程:

  1. 第一個線程進入lock方法,此時是確定能夠獲取到鎖,直接執行,不會進入到addWaiter方法
  2. 第二個線程進入lock方法,咱們假設第一個線程尚未釋放鎖,此時進入執行enq方法,enq進行鏈表的初始化。

  1. 第三個線程以及更多的線程進入lock方法,此時再也不執行enq方法,而是在初始化以後的鏈表進行連接。

acquireQueued

final boolean acquireQueued(final Node node, int arg) {
  //局部變量
  boolean failed = true;
  try {
      //局部變量
      boolean interrupted = false;
      //死循環
      for (;;) {
          //獲取前置結點
          final Node p = node.predecessor();
          //前置結點爲head而且嘗試獲取鎖成功,則不阻塞
          if (p == head && tryAcquire(arg)) {
              setHead(node);
              p.next = null; // help GC
              failed = false;
              return interrupted;
          }
          //阻塞操做 , 判斷是否應該阻塞 而且 阻塞是否成功
          if (
                //是否在搶佔鎖失敗後阻塞
              shouldParkAfterFailedAcquire(p, node) &&
              //Unsafe操做使線程阻塞
              parkAndCheckInterrupt())
              interrupted = true;
      }
  } finally {
      if (failed)
          cancelAcquire(node);
  }
}

shouldParkAfterFailedAcquire分析

//Node pred 前置結點, Node node 當前結點
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    //獲取前置結點的等待狀態
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * 喚醒信號,即前結點正常,就設置waitStatus爲SIGNAL,表示前置結點能夠喚醒當前結點,那          * 麼當前結點纔會安心的被阻塞(若是前置結點不正常,可能就會致使本身不能被喚醒,那確定不          * 能安心睡覺的)
         */
        return true;
    if (ws > 0) {
        /*
         * 找到前置結點中waitStatus <= 0 的Node結點並設置爲當前結點的前置結點
         * 此狀態表示結點不是處於正常狀態,那麼將他從鏈表中刪除,直到找到狀態正常的結點
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * 當waitStatus = 0 或者 PROPAGATE(-3) 時,CAS設置值爲SIGNAL(-1)
         * 此狀態表示線程正常,但沒有設置喚醒,通常爲tail的前一個結點,那麼須要將其設置爲可喚醒          * 狀態(SIGNAL)
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

圖解以下。

至此,咱們瞭解了AQS對須要等待的線程存儲的過程。
而AQS的解鎖以及公平鎖、非公平鎖,共享鎖、獨享鎖等後續跟上。

參考資料:

https://www.cnblogs.com/water...
https://www.jianshu.com/p/d61...

相關文章
相關標籤/搜索