《提高能力,漲薪可待》-Java併發之AQS全面詳解

歡迎關注掘金:【Ccww】,一塊兒學習
提高能力,漲薪可待
面試知識,工做可待
實戰演練,拒絕996
也歡迎關注微信公衆號【Ccww筆記】,原創技術文章第一時間推出
若是此文對你有幫助、喜歡的話,那就點個讚唄!html

前言

是否是感受在工做上難於晉升了呢?
是否是感受找工做面試是那麼難呢?
是否是感受本身天天都在996加班呢?node

在工做上必須保持學習的能力,這樣才能在工做獲得更好的晉升,漲薪指日可待,歡迎一塊兒學習【提高能力,漲薪可待】系列
在找工做面試應在學習的基礎進行總結面試知識點,工做也指日可待,歡迎一塊兒學習【面試知識,工做可待】系列
最後,理論知識到準備充足,是否是該躬行起來呢?歡迎一塊兒學習【實戰演練,拒絕996】系列面試

1、AQS是什麼?有什麼用?

​ AQS全稱AbstractQueuedSynchronizer,即抽象的隊列同步器,是一種用來構建鎖和同步器的框架。api

基於AQS構建同步器:安全

  • ReentrantLock
  • Semaphore
  • CountDownLatch
  • ReentrantReadWriteLock
  • SynchronusQueue
  • FutureTask

優點:微信

  • AQS 解決了在實現同步器時涉及的大量細節問題,例如自定義標準同步狀態、FIFO 同步隊列。
  • 基於 AQS 來構建同步器能夠帶來不少好處。它不只可以極大地減小實現工做,並且也沒必要處理在多個位置上發生的競爭問題。

2、AQS核心知識

2.1 AQS核心思想

  若是被請求的共享資源空閒,則將當前請求資源的線程設置爲有效的工做線程,而且將共享資源設置爲鎖定狀態。若是被請求的共享資源被佔用,那麼就須要一套線程阻塞等待以及被喚醒時鎖分配的機制,這個機制AQS是用CLH隊列鎖實現的,即將暫時獲取不到鎖的線程加入到隊列中。如圖所示:併發

Sync queue: 同步隊列,是一個雙向列表。包括head節點和tail節點。head節點主要用做後續的調度。框架

Condition queue: 非必須,單向列表。當程序中存在cindition的時候纔會存在此列表。

2.2 AQS設計思想

  • AQS使用一個int成員變量來表示同步狀態
  • 使用Node實現FIFO隊列,能夠用於構建鎖或者其餘同步裝置
  • AQS資源共享方式:獨佔Exclusive(排它鎖模式)和共享Share(共享鎖模式)

AQS它的全部子類中,要麼實現並使用了它的獨佔功能的api,要麼使用了共享鎖的功能,而不會同時使用兩套api,即使是最有名的子類ReentrantReadWriteLock也是經過兩個內部類讀鎖和寫鎖分別實現了兩套api來實現的學習

2.3 state狀態

state狀態使用volatile int類型的變量,表示當前同步狀態。state的訪問方式有三種:ui

  • getState()
  • setState()
  • compareAndSetState()

2.4 AQS中Node常量含義

CANCELLED
waitStatus值爲1時表示該線程節點已釋放(超時、中斷),已取消的節點不會再阻塞。

SIGNAL
waitStatus爲-1時表示該線程的後續線程須要阻塞,即只要前置節點釋放鎖,就會通知標識爲 SIGNAL 狀態的後續節點的線程

CONDITION
waitStatus爲-2時,表示該線程在condition隊列中阻塞(Condition有使用)

PROPAGATE
waitStatus爲-3時,表示該線程以及後續線程進行無條件傳播(CountDownLatch中有使用)共享模式下, PROPAGATE 狀態的線程處於可運行狀態

2.5 同步隊列爲何稱爲FIFO呢?

由於只有前驅節點是head節點的節點才能被首先喚醒去進行同步狀態的獲取。當該節點獲取到同步狀態時,它會清除本身的值,將本身做爲head節點,以便喚醒下一個節點。

2.6 Condition隊列

​ 除了同步隊列以外,AQS中還存在Condition隊列,這是一個單向隊列。調用ConditionObject.await()方法,可以將當前線程封裝成Node加入到Condition隊列的末尾,而後將獲取的同步狀態釋放(即修改同步狀態的值,喚醒在同步隊列中的線程)。

Condition隊列也是FIFO。調用ConditionObject.signal()方法,可以喚醒firstWaiter節點,將其添加到同步隊列末尾。

2.7 自定義同步器的實現

在構建自定義同步器時,只須要依賴AQS底層再實現共享資源state的獲取與釋放操做便可。自定義同步器實現時主要實現如下幾種方法:

  • isHeldExclusively():該線程是否正在獨佔資源。只有用到condition才須要去實現它。
  • tryAcquire(int):獨佔方式。嘗試獲取資源,成功則返回true,失敗則返回false。
  • tryRelease(int):獨佔方式。嘗試釋放資源,成功則返回true,失敗則返回false。
  • tryAcquireShared(int):共享方式。嘗試獲取資源。負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。
  • tryReleaseShared(int):共享方式。嘗試釋放資源,若是釋放後容許喚醒後續等待結點返回true,不然返回false。

三 AQS實現細節

線程首先嚐試獲取鎖,若是失敗就將當前線程及等待狀態等信息包裝成一個node節點加入到FIFO隊列中。 接着會不斷的循環嘗試獲取鎖,條件是當前節點爲head的直接後繼纔會嘗試。若是失敗就會阻塞本身直到本身被喚醒。而當持有鎖的線程釋放鎖的時候,會喚醒隊列中的後繼線程。

3.1 獨佔模式下的AQS

所謂獨佔模式,即只容許一個線程獲取同步狀態,當這個線程尚未釋放同步狀態時,其餘線程是獲取不了的,只能加入到同步隊列,進行等待。

很明顯,咱們能夠將state的初始值設爲0,表示空閒。當一個線程獲取到同步狀態時,利用CAS操做讓state加1,表示非空閒,那麼其餘線程就只能等待了。釋放同步狀態時,不須要CAS操做,由於獨佔模式下只有一個線程能獲取到同步狀態。ReentrantLock、CyclicBarrier正是基於此設計的。

例如,ReentrantLock,state初始化爲0,表示未鎖定狀態。A線程lock()時,會調用tryAcquire()獨佔該鎖並將state+1。

獨佔模式下的AQS是不響應中斷的,指的是加入到同步隊列中的線程,若是由於中斷而被喚醒的話,不會當即返回,而且拋出InterruptedException。而是再次去判斷其前驅節點是否爲head節點,決定是否爭搶同步狀態。若是其前驅節點不是head節點或者爭搶同步狀態失敗,那麼再次掛起。

3.1.1 獨佔模式獲取資源-acquire方法

acquire以獨佔exclusive方式獲取資源。若是獲取到資源,線程直接返回,不然進入等待隊列,直到獲取到資源爲止,且整個過程忽略中斷的影響。源碼以下:

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
複製代碼

流程圖:

  • 調用自定義同步器的tryAcquire()嘗試直接去獲取資源,若是成功則直接返回;
  • 沒成功,則addWaiter()將該線程加入等待隊列的尾部,並標記爲獨佔模式;
  • acquireQueued()使線程在等待隊列中休息,有機會時(輪到本身,會被unpark())會去嘗試獲取資源。獲取到資源後才返回。若是在整個等待過程當中被中斷過,則返回true,不然返回false。
  • 若是線程在等待過程當中被中斷過,它是不響應的。只是獲取資源後纔再進行自我中斷selfInterrupt(),將中斷補上。

3.1.2 獨佔模式獲取資源-tryAcquire方法

tryAcquire嘗試以獨佔的方式獲取資源,若是獲取成功,則直接返回true,不然直接返回false,且具體實現由自定義AQS的同步器實現的。

protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
複製代碼

3.1.3 獨佔模式獲取資源-addWaiter方法

  根據不一樣模式(Node.EXCLUSIVE互斥模式、Node.SHARED共享模式)建立結點並以CAS的方式將當前線程節點加入到不爲空的等待隊列的末尾(經過compareAndSetTail()方法)。若是隊列爲空,經過enq(node)方法初始化一個等待隊列,並返回當前節點。

/**
* 參數
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* 返回值
* @return the new node
*/
private Node addWaiter(Node mode) {
    //將當前線程以指定的模式建立節點node
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    // 獲取當前同隊列的尾節點
    Node pred = tail;
    //隊列不爲空,將新的node加入等待隊列中
    if (pred != null) {
        node.prev = pred;
         //CAS方式將當前節點尾插入隊列中
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    //當隊列爲empty或者CAS失敗時會調用enq方法處理
    enq(node);
    return node;
}
複製代碼

其中,隊列爲empty,使用enq(node)處理,將當前節點插入等待隊列,若是隊列爲空,則初始化當前隊列。全部操做都是CAS自旋的方式進行,直到成功加入隊尾爲止。

private Node enq(final Node node) {
        //不斷自旋
        for (;;) {
            Node t = tail;
            //當前隊列爲empty
            if (t == null) { // Must initialize
             //完成隊列初始化操做,頭結點中不放數據,只是做爲起始標記,lazy-load,在第一次用的時候new
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                //不斷將當前節點使用CAS尾插入隊列中直到成功爲止
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
複製代碼

3.1.4 獨佔模式獲取資源-acquireQueued方法

acquireQueued用於已在隊列中的線程以獨佔且不間斷模式獲取state狀態,直到獲取鎖後返回。主要流程:

  • 結點node進入隊列尾部後,檢查狀態;

  • 調用park()進入waiting狀態,等待unpark()或interrupt()喚醒;

  • 被喚醒後,是否獲取到鎖。若是獲取到,head指向當前結點,並返回從入隊到獲取鎖的整個過程當中是否被中斷過;若是沒獲取到,繼續流程1

    final boolean acquireQueued(final Node node, int arg) {
      //是否已獲取鎖的標誌,默認爲true 即爲還沒有
      boolean failed = true;
      try {
          //等待中是否被中斷過的標記
          boolean interrupted = false;
          for (;;) {
              //獲取前節點
              final Node p = node.predecessor();
              //若是當前節點已經成爲頭結點,嘗試獲取鎖(tryAcquire)成功,而後返回
              if (p == head && tryAcquire(arg)) {
                  setHead(node);
                  p.next = null; // help GC
                  failed = false;
                  return interrupted;
              }
              //shouldParkAfterFailedAcquire根據對當前節點的前一個節點的狀態進行判斷,對當前節點作出不一樣的操做
              //parkAndCheckInterrupt讓線程進入等待狀態,並檢查當前線程是否被能夠被中斷
              if (shouldParkAfterFailedAcquire(p, node) &&
                  parkAndCheckInterrupt())
                  interrupted = true;
          }
      } finally {
          //將當前節點設置爲取消狀態;取消狀態設置爲1
          if (failed)
              cancelAcquire(node);
      }
    }
    複製代碼

3.1.5 獨佔模式釋放資源-release方法

release方法是獨佔exclusive模式下線程釋放共享資源的鎖。它會調用tryRelease()釋放同步資源,若是所有釋放了同步狀態爲空閒(即state=0),當同步狀態爲空閒時,它會喚醒等待隊列裏的其餘線程來獲取資源。這也正是unlock()的語義,固然不只僅只限於unlock().

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
複製代碼

3.1.6 獨佔模式釋放資源-tryRelease方法

tryRelease()tryAcquire()同樣實現都是由自定義定時器以獨佔exclusive模式實現的。由於其是獨佔模式,不須要考慮線程安全的問題去釋放共享資源,直接減掉相應量的資源便可(state-=arg)。並且tryRelease()的返回值表明着該線程是否已經完成資源的釋放,所以在自定義同步器的tryRelease()時,須要明確這條件,當已經完全釋放資源(state=0),要返回true,不然返回false。

protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }
複製代碼

ReentrantReadWriteLock的實現:

protected final boolean tryRelease(int releases) {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        //減掉相應量的資源(state-=arg)
        int nextc = getState() - releases;
        //是否徹底釋放資源
        boolean free = exclusiveCount(nextc) == 0;
        if (free)
            setExclusiveOwnerThread(null);
        setState(nextc);
        return free;
    }
複製代碼

3.1.7 獨佔模式釋放資源-unparkSuccessor

unparkSuccessor用unpark()喚醒等待隊列中最前驅的那個未放棄線程,此線程並不必定是當前節點的next節點,而是下一個能夠用來喚醒的線程,若是這個節點存在,調用unpark()方法喚醒。

private void unparkSuccessor(Node node) {
    //當前線程所在的結點node
    int ws = node.waitStatus;
    //置零當前線程所在的結點狀態,容許失敗
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    //找到下一個須要喚醒的結點
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 從後向前找
        for (Node t = tail; t != null && t != node; t = t.prev)
            //從這裏能夠看出,<=0的結點,都是還有效的結點
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
         //喚醒
        LockSupport.unpark(s.thread);
}
複製代碼

3.2 共享模式下的AQS

共享模式,固然是容許多個線程同時獲取到同步狀態,共享模式下的AQS也是不響應中斷的.

很明顯,咱們能夠將state的初始值設爲N(N > 0),表示空閒。每當一個線程獲取到同步狀態時,就利用CAS操做讓state減1,直到減到0表示非空閒,其餘線程就只能加入到同步隊列,進行等待。釋放同步狀態時,須要CAS操做,由於共享模式下,有多個線程能獲取到同步狀態。CountDownLatch、Semaphore正是基於此設計的。

例如,CountDownLatch,任務分爲N個子線程去執行,同步狀態state也初始化爲N(注意N要與線程個數一致):  

3.2.1 共享模式獲取資源-acquireShared方法

acquireShared在共享模式下線程獲取共享資源的頂層入口。它會獲取指定量的資源,獲取成功則直接返回,獲取失敗則進入等待隊列,直到獲取到資源爲止,整個過程忽略中斷。

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}
複製代碼

流程:

  • 先經過tryAcquireShared()嘗試獲取資源,成功則直接返回;
  • 失敗則經過doAcquireShared()中的park()進入等待隊列,直到被unpark()/interrupt()併成功獲取到資源才返回(整個等待過程也是忽略中斷響應)。

3.2.2 共享模式獲取資源-tryAcquireShared方法

tryAcquireShared()跟獨佔模式獲取資源方法同樣實現都是由自定義同步器去實現。但AQS規範中已定義好tryAcquireShared()的返回值:

  • 負值表明獲取失敗;
  • 0表明獲取成功,但沒有剩餘資源;
  • 正數表示獲取成功,還有剩餘資源,其餘線程還能夠去獲取。
protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }
複製代碼

3.2.3 共享模式獲取資源-doAcquireShared方法

doAcquireShared()用於將當前線程加入等待隊列尾部休息,直到其餘線程釋放資源喚醒本身,本身成功拿到相應量的資源後才返回。

private void doAcquireShared(int arg) {
    //加入隊列尾部
    final Node node = addWaiter(Node.SHARED);
    //是否成功標誌
    boolean failed = true;
    try {
        //等待過程當中是否被中斷過的標誌
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();//獲取前驅節點
            if (p == head) {//若是到head的下一個,由於head是拿到資源的線程,此時node被喚醒,極可能是head用完資源來喚醒本身的
                int r = tryAcquireShared(arg);//嘗試獲取資源
                if (r >= 0) {//成功
                    setHeadAndPropagate(node, r);//將head指向本身,還有剩餘資源能夠再喚醒以後的線程
                    p.next = null; // help GC
                    if (interrupted)//若是等待過程當中被打斷過,此時將中斷補上。
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            
            //判斷狀態,隊列尋找一個適合位置,進入waiting狀態,等着被unpark()或interrupt()
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }   
}
複製代碼

3.2.4 共享模式釋放資源-releaseShared方法

releaseShared()用於共享模式下線程釋放共享資源,釋放指定量的資源,若是成功釋放且容許喚醒等待線程,它會喚醒等待隊列裏的其餘線程來獲取資源。

public final boolean releaseShared(int arg) {
    //嘗試釋放資源
    if (tryReleaseShared(arg)) {
        //喚醒後繼結點
        doReleaseShared();
        return true;
    }
    return false;
}
複製代碼

獨佔模式下的tryRelease()在徹底釋放掉資源(state=0)後,纔會返回true去喚醒其餘線程,這主要是基於獨佔下可重入的考量;而共享模式下的releaseShared()則沒有這種要求,共享模式實質就是控制必定量的線程併發執行,那麼擁有資源的線程在釋放掉部分資源時就能夠喚醒後繼等待結點。
https://www.cnblogs.com/waterystone/p/4920797.html

3.2.共享模式釋放資源-doReleaseShared方法

doReleaseShared()主要用於喚醒後繼節點線程,當state爲正數,去獲取剩餘共享資源;當state=0時去獲取共享資源。

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;
                    //喚醒後繼
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;
        }
        // head發生變化
        if (h == head)
            break;
    }
}
複製代碼

歡迎關注微信公衆號【Ccww筆記】,原創技術文章第一時間推出

相關文章
相關標籤/搜索