領統Java併發半壁江山的AQS你真的懂了嗎?

領統Java併發半壁江山的AQS你真的懂了嗎?

1、JUC的由來

synchronized 關鍵字是JDK官方人員用C++代碼寫的,在JDK6之前是重量級鎖。Java大牛 Doug Lea對 synchronized 在併發編程條件下的性能表現不滿意就本身寫了個JUC,以此來提高併發性能,本文要講的就是JUC併發包下的AbstractQueuedSynchronizernode

在JUC中 CountDownLatch、ReentrantLock、ThreadPoolExecutor、ReentrantReadWriteLock 等底層用的都是AQS,AQS幾乎佔據了JUC併發包裏的半壁江山,若是想要獲取鎖能夠被中斷、超時獲取鎖、嘗試獲取鎖那就用AQS吧。編程

**2、AQS前置知識點

2.一、模板方法設計模式

AbstractQueuedSynchronizer是個抽象類,全部用到方法的類都要繼承此類的若干方法,對應的設計模式就是模版模式安全

模版模式定義:一個抽象類公開定義了執行它的方法的方式/模板。它的子類能夠按須要重寫方法實現,但調用將以抽象類中定義的方式進行。這種類型的設計模式屬於行爲型模式。多線程

抽象類:併發

public abstract class SendCustom {
 public abstract void to();
 public abstract void from();
 public void date() {
  System.out.println(new Date());
 }
 public abstract void send();
 // 注意此處 框架方法-模板方法
 public void sendMessage() {
  to();
  from();
  date();
  send();
 }
}

模板方法派生類:框架

public class SendSms extends SendCustom {

 @Override
 public void to() {
  System.out.println("sowhat");
 }

 @Override
 public void from() {
  System.out.println("xiaomai");
 }

 @Override
 public void send() {
  System.out.println("Send message");
 }

 public static void main(String[] args) {
  SendCustom sendC = new SendSms();
  sendC.sendMessage();
 }
}

2.二、LookSupportide

LockSupport 是一個線程阻塞工具類,全部的方法都是靜態方法,可讓線程在任意位置阻塞,固然阻塞以後確定得有喚醒的方法。經常使用方法以下:函數

public static void park(Object blocker); // 暫停當前線程
public static void parkNanos(Object blocker, long nanos); // 暫停當前線程,不過有超時時間的限制
public static void parkUntil(Object blocker, long deadline); // 暫停當前線程,直到某個時間
public static void park(); // 無期限暫停當前線程
public static void parkNanos(long nanos); // 暫停當前線程,不過有超時時間的限制
public static void parkUntil(long deadline); // 暫停當前線程,直到某個時間
public static void unpark(Thread thread); // 恢復當前線程
public static Object getBlocker(Thread t);

park是由於park英文意思爲停車。咱們若是把Thread當作一輛車的話,park就是讓車停下,unpark就是讓車啓動而後跑起來。高併發

與Object類的wait/notify機制相比,park/unpark有兩個優勢:

1.以thread爲操做對象更符合阻塞線程的直觀定義
2.操做更精準,能夠準確地喚醒某一個線程(notify隨機喚醒一個線程,notifyAll 喚醒全部等待的線程),增長了靈活性。

park/unpark調用的是 Unsafe(提供CAS操做) 中的 native代碼。

park/unpark 功能在Linux系統下是用的Posix線程庫pthread中的mutex(互斥量),condition(條件變量)來實現的。mutexcondition保護了一個 _counter 的變量,當 park 時,這個變量被設置爲0。當unpark時,這個變量被設置爲1。

2.三、CAS

CAS 是 CPU指令級別實現了原子性的比較和交換(Conmpare And Swap)操做,注意CAS不是鎖只是CPU提供的一個原子性操做指令。

領統Java併發半壁江山的AQS你真的懂了嗎?

CAS在語言層面不進行任何處理,直接將原則操做實如今硬件級別實現,之因此能夠實現硬件級別的操做核心是由於CAS操做類中有個核心類UnSafe類。

關於CAS引起的ABA問題、性能開銷問題、只能保證一個共享變量之間的原則性操做問題,之前 CAS 中寫過,在此再也不重複講解。

注意:並非說 CAS 必定比SYN好,若是高併發執行時間久 ,用SYN好, 由於SYN底層用了wait() 阻塞後是不消耗CPU資源的。若是鎖競爭不激烈說明自旋不嚴重,此時用CAS。

3、AQS重要方法

模版方法分爲獨佔式共享式,子類根據須要不一樣調用不一樣的模版方法(講解有點多,想看底層可直接下滑到第四章節)。

3.1 模板方法

3.1.1 獨佔式獲取

3.1.1.1 accquire

不可中斷獲取鎖accquire是獲取獨佔鎖方法,acquire嘗試獲取資源,成功則直接返回,不成功則進入等待隊列,這個過程不會被線程中斷,被外部中斷也不響應,獲取資源後纔再進行自我中斷selfInterrupt()

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
}

1.acquire(arg) tryAcquire(arg) 顧名思義,它就是嘗試獲取鎖,須要咱們本身實現具體細節,通常要求是:

若是該鎖沒有被另外一個線程保持,則獲取該鎖並當即返回,將鎖的保持計數設置爲 1。

若是當前線程已經保持該鎖,則將保持計數加 1,而且該方法當即返回。

若是該鎖被另外一個線程保持,則出於線程調度的目的,禁用當前線程,而且在得到鎖以前,該線程將一直處於休眠狀態,此時鎖保持計數被設置爲 1。

2.addWaiter(Node.EXCLUSIVE)

主要功能是 一旦嘗試獲取鎖未成功,就要使用該方法將其加入同步隊列尾部,因爲可能有多個線程併發加入隊尾產生競爭,所以採用compareAndSetTail鎖方法來保證同步

3.acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

一旦加入同步隊列,就須要使用該方法,自旋阻塞 喚醒來不斷的嘗試獲取鎖,直到被中斷或獲取到鎖。

3.1.1.2 acquireInterruptibly

可中斷獲取鎖acquireInterruptibly相比於acquire支持響應中斷。

一、若是當前線程未被中斷,則嘗試獲取鎖。

二、若是鎖空閒則獲鎖並當即返回,state = 1。

三、若是當前線程已持此鎖,state + 1,而且該方法當即返回。

四、若是鎖被另外一個線程保持,出於線程調度目的,禁用當前線程,線程休眠ing,除非鎖由當前線程得到或者當前線程被中斷了,中斷後會拋出InterruptedException,而且清除當前線程的已中斷狀態。

五、此方法是一個顯式中斷點,因此要優先考慮響應中斷。

if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
     throw new InterruptedException(); // acquireInterruptibly 選擇
      interrupted = true; // acquire 的選擇

3.1.1.3 tryAcquireNanos

該方法能夠被中斷,增長了超時則失敗的功能。能夠說該方法的實現與上述兩方法沒有任何區別。時間功能上就是用的標準超時功能,若是剩餘時間小於0那麼acquire失敗,若是該時間大於一次自旋鎖時間(spinForTimeoutThreshold = 1000),而且能夠被阻塞,那麼調用LockSupport.parkNanos方法阻塞線程。

doAcquireNanos內部:

nanosTimeout = deadline - System.nanoTime();
  if (nanosTimeout <= 0L)
      return false;
  if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold)
      LockSupport.parkNanos(this, nanosTimeout);
  if (Thread.interrupted())
      throw new InterruptedException();

該方法通常會有如下幾種狀況產生:

1.在指定時間內,線程獲取到鎖,返回true。
2.當前線程在超時時間內被中斷,拋中斷異常後,線程退出。
3.到截止時間後線程仍未獲取到鎖,此時線程得到鎖失敗,再也不等待直接返回false。

3.1.2 共享式獲取

3.1.2.1 acquireShared

public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

該模版方法的工做:

1.調用tryAcquireShared(arg) 嘗試得到資源,返回值表明以下含義:

負數表示失敗。

0 表示成功,但沒有剩餘可用資源。

正數表示成功,且有剩餘資源。

doAcquireShared做用:

建立節點而後加入到隊列中去,這一塊和獨佔模式下的 addWaiter 代碼差很少,不一樣的是結點的模式是SHARED,在獨佔模式 EXCLUSIVE。

3.1.2.2 acquireSharedInterruptibly

無非就是可中斷性的共享方法

public final void acquireSharedInterruptibly(long arg)  throws InterruptedException {
    if (Thread.interrupted()) // 若是線程被中斷,則拋出異常
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)  
        // 若是tryAcquireShared()方法獲取失敗,則調用以下的方法
        doAcquireSharedInterruptibly(arg);
}

3.1.2.3. tryAcquireSharedNanos

嘗試以共享模式獲取,若是被中斷則停止,若是超過給定超時期則失敗。實現此方法首先要檢查中斷狀態,而後至少調用一次 tryacquireshared(long),並在成功時返回。不然,在成功、線程中斷或超過超時期以前,線程將加入隊列,可能反覆處於阻塞或未阻塞狀態,並一直調用 tryacquireshared(long)

public final boolean tryAcquireSharedNanos(long arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquireShared(arg) >= 0 ||
            doAcquireSharedNanos(arg, nanosTimeout);
    }

3.1.3 獨佔式釋放

獨佔鎖的釋放調用unlock方法,而該方法實際調用了AQS的release方法,這段代碼邏輯比較簡單,若是同步狀態釋放成功(tryRelease返回true)則會執行if塊中的代碼,當head指向的頭結點不爲null,而且該節點的狀態值不爲0的話纔會執行unparkSuccessor()方法。

public final boolean release(long arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

3.1.4 共享式釋放

releaseShared首先去嘗試釋放資源tryReleaseShared(arg),若是釋放成功了,就表明有資源空閒出來,那麼就用doReleaseShared()去喚醒後續結點。

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

好比CountDownLatch的countDown()具體實現:

public void countDown() {
        sync.releaseShared(1);
    }

3.2 子類需實現方法

子類要實現父類方法也分爲獨佔式共享式

3.2.1 獨佔式獲取

tryAcquire 顧名思義,就是嘗試獲取鎖,AQS在這裏沒有對其進行功能的實現,只有一個拋出異常的語句,咱們須要本身對其進行實現,能夠對其重寫實現公平鎖、不公平鎖、可重入鎖、不可重入鎖

protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
}

3.2.2 獨佔式釋放

tryRelease 嘗試釋放 獨佔鎖,須要子類實現。

protected boolean tryRelease(long arg) {
        throw new UnsupportedOperationException();
    }

3.2.3 共享式獲取

tryAcquireShared 嘗試進行共享鎖的得到,須要子類實現。

protected long tryAcquireShared(long arg) {
        throw new UnsupportedOperationException();
    }

3.2.4 共享式釋放

tryReleaseShared嘗試進行共享鎖的釋放,須要子類實現。

protected boolean tryReleaseShared(long arg) {
        throw new UnsupportedOperationException();
    }

3.3  狀態標誌位

state由於用 volatile 修飾 保證了咱們操做的可見性,因此任何線程經過getState()得到狀態都是能夠獲得最新值,可是setState()沒法保證原子性,所以AQS給咱們提供了compareAndSetState方法利用底層UnSafe的CAS功能來實現原子性。

private volatile long state;

    protected final long getState() {
        return state;
    }

    protected final void setState(long newState) {
        state = newState;
    }

   protected final boolean compareAndSetState(long expect, long update) {
        return unsafe.compareAndSwapLong(this, stateOffset, expect, update);
    }

3.4 查詢是否獨佔模式

isHeldExclusively 該函數的功能是查詢當前的工做模式是不是獨佔模式。須要子類實現。

protected boolean isHeldExclusively() {
        throw new UnsupportedOperationException();
    }

3.5 自定義實現鎖

這裏須要重點說明一點,JUC中通常是用一個子類繼承自Lock,而後在子類中定義一個內部類來實現AQS的繼承跟使用

public class SowhatLock implements Lock
{
 private Sync sync = new Sync();

 @Override
 public void lock()
 {
  sync.acquire(1);
 }

 @Override
 public boolean tryLock()
 {
  return false;
 }

 @Override
 public boolean tryLock(long time, TimeUnit unit) throws InterruptedException
 {
  return sync.tryAcquireNanos(1,unit.toNanos(time));
 }

 @Override
 public void unlock()
 {
  sync.release(1);
 }

 @Override
 public Condition newCondition()
 {
  return sync.newCondition();
 }

 @Override
 public void lockInterruptibly() throws InterruptedException
 {
 }

 private class Sync extends AbstractQueuedSynchronizer
 {
  @Override
  protected boolean tryAcquire(int arg)
  {
   assert arg == 1;
   if (compareAndSetState(0, 1))
   {
    setExclusiveOwnerThread(Thread.currentThread());
    return true;
   }
   return false;
  }

  @Override
  protected boolean tryRelease(int arg)
  {
   assert arg == 1;
   if (!isHeldExclusively())
   {
    throw new IllegalMonitorStateException();
   }
   setExclusiveOwnerThread(null);
   setState(0);
   return true;
  }

  @Override
  protected boolean isHeldExclusively()
  {
   return getExclusiveOwnerThread() == Thread.currentThread();
  }

  Condition newCondition() {
   return new ConditionObject();
  }
 }
}

自定義實現類:

public class SoWhatTest
{
 public static int m = 0;
 public  static CountDownLatch latch  = new CountDownLatch(50);
 public static Lock lock = new SowhatLock();

 public static void main(String[] args) throws  Exception
 {
  Thread[] threads = new Thread[50];
  for (int i = 0; i < threads.length ; i++)
  {
   threads[i] = new Thread(()->{
    try{
     lock.lock();
     for (int j = 0; j <100 ; j++)
     {
      m++;
     }
    }finally
    {
     lock.unlock();
    }
    latch.countDown();
  });
  }
  for(Thread t : threads) t.start();
  latch.await();
  System.out.println(m);
 }
}

4、AQS底層

4.1 CLH

CLH(Craig、 Landin、 Hagersten locks三我的名字綜合而命名):

1.是一個自旋鎖,能確保無飢餓性,提供先來先服務的公平性。
2.CLH 鎖也是一種基於鏈表的可擴展、高性能、公平的自旋鎖,申請線程只在本地變量上自旋,它不斷輪詢前驅的狀態,若是發現前驅釋放了鎖就結束自旋。

4.2 Node

CLH 隊列由Node對象組成,其中Node是AQS中的內部類。

static final class Node {
 // 標識共享鎖
 static final Node SHARED = new Node();
 // 標識獨佔鎖
 static final Node EXCLUSIVE = null;
 // 前驅節點
 volatile Node prev;
 // 後繼節點
 volatile Node next;
 // 獲取鎖失敗的線程保存在Node節點中。
 volatile Thread thread;
 // 當咱們調用了Condition後他也有一個等待隊列
 Node nextWaiter;
 //在Node節點中通常經過waitStatus得到下面節點不一樣的狀態,狀態對應下方。
 volatile int waitStatus;
 static final int CANCELLED =  1;
 static final int SIGNAL    = -1;
 static final int CONDITION = -2;
 static final int PROPAGATE = -3;

waitStatus 有以下5中狀態:

1.CANCELLED = 1

表示當前結點已取消調度。當超時或被中斷(響應中斷的狀況下),會觸發變動爲此狀態,進入該狀態後的結點將不會再變化。

2.SIGNAL = -1

表示後繼結點在等待當前結點喚醒。後繼結點入隊時,會將前繼結點的狀態更新爲 SIGNAL。

3.CONDITION = -2

表示結點等待在 Condition 上,當其餘線程調用了 Condition 的 signal() 方法後,CONDITION狀態的結點將從等待隊列轉移到同步隊列中,等待獲取同步鎖。

4.PROPAGATE = -3

共享模式下,前繼結點不只會喚醒其後繼結點,同時也可能會喚醒後繼的後繼結點。

5.INITIAL = 0

新結點入隊時的默認狀態。

4.3 AQS實現

4.3.1 公平鎖和非公平鎖

銀行售票窗口營業中:

公平排隊:每一個客戶來了自動在最後面排隊,輪到本身辦理業務的時候拿出身份證等證件取票。

非公平排隊:有個旅客火車立刻開車了,他拿着本身的各類證件着急這想跟窗口工做人員說是否能夠加急辦理下,能夠的話則直接辦理,不能夠的話則去隊尾排隊去。

在JUC中一樣存在公平鎖非公平鎖通常非公平鎖效率好一些。由於非公平鎖狀態下打算搶鎖的線程不用排隊掛起了

4.3.2 AQS細節

AQS內部維護着一個FIFO的隊列,即CLH隊列,提供先來先服務的公平性。AQS的同步機制就是依靠CLH隊列實現的。CLH隊列是FIFO的雙端雙向鏈表隊列(方便尾部節點插入)。線程經過AQS獲取鎖失敗,就會將線程封裝成一個Node節點,經過CAS原子操做插入隊列尾。當有線程釋放鎖時,會嘗試讓隊頭的next節點佔用鎖,我的理解AQS具備以下幾個特色:

1.在AQS 同步隊列中 -1 表示線程在睡眠狀態
2.當前Node節點線程會把前一個Node.ws = -1。當前節點把前面節點ws設置爲-1,你能夠理解爲:你本身能知道本身睡着了嗎?只能是別人看到了發現你睡眠了
3.持有鎖的線程永遠不在隊列中
4.在AQS隊列中第二個纔是最早排隊的線程
5.若是是交替型任務或者單線程任務,即便用了Lock也不會涉及到AQS 隊列
6.不到萬不得已不要輕易park線程,很耗時的!因此排隊的頭線程會自旋的嘗試幾個獲取鎖

4.4 加鎖跟解鎖流程圖

以最經典的 ReentrantLock 爲例逐步分析下 lock 跟 unlock 底層流程圖(要原圖的話公衆號回覆:lock)。

private Lock lock = new ReentrantLock();
public void test(){
    lock.lock();
    try{
        doSomeThing();
    }catch (Exception e){
      ...
    }finally {
        lock.unlock();
    }
}

領統Java併發半壁江山的AQS你真的懂了嗎?

4.4.1 獨佔式加入同步隊列

同步器AQS中包含兩個節點類型的引用:一個指向頭結點的引用(head),一個指向尾節點的引用(tail),若是加入的節點是OK的則會直接運行該節點,當若干個線程搶鎖失敗了那麼就會搶着加入到同步隊列的尾部,由於是搶着加入這個時候用CAS來設置尾部節點。入口代碼:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

1.tryAcquire

該方法是須要自我實現的,在上面的demo中可見一斑,就是返回是否得到了鎖。

protected final boolean tryAcquire(int acquires) {
     final Thread current = Thread.currentThread();
     int c = getState();
     if (c == 0) {
         //  是否須要加入隊列,不須要的話則嘗試CAS得到鎖,得到成功後 設置當前鎖的擁有者
         if (!hasQueuedPredecessors() &&
             compareAndSetState(0, acquires)) {
             setExclusiveOwnerThread(current);
             return true;
         }
     }
     else if (current == getExclusiveOwnerThread()) {
         // 這就是可重入鎖的實現  
         int nextc = c + acquires;
         if (nextc < 0)
             throw new Error("Maximum lock count exceeded");
         setState(nextc);
         return true;
     }
     return false;
 }

2.addWaiter(Node.EXCLUSIVE,arg)

/**
 * 若是嘗試獲取同步狀態失敗的話,則構造同步節點(獨佔式的Node.EXCLUSIVE),經過addWaiter(Node node,int args)方法將該節點加入到同步隊列的隊尾。
 */
 private Node addWaiter(Node mode) {
     // 用當前線程構造一個Node對象,mode是一個表示Node類型的字段,或者說是這個節點是獨佔的仍是共享的
     Node node = new Node(Thread.currentThread(), mode);
     // 將目前隊列中尾部節點給pred
     Node pred = tail;
     // 隊列不爲空的時候
     if (pred != null) {
         node.prev = pred;
         // 先嚐試經過AQS方式修改尾節點爲最新的節點,若是修改失敗,意味着有併發,
         if (compareAndSetTail(pred, node)) {
             pred.next = node;
             return node;
         }
     }
     //第一次嘗試添加尾部失敗說明有併發,此時進入自旋
     enq(node);
     return node;
 }

3.自旋enq 方法將併發添加節點的請求經過CAS跟自旋將尾節點的添加變得串行化起來。說白了就是讓節點放到正確的隊尾位置。

/**
* 這裏進行了循環,若是此時存在了tail就執行同上一步驟的添加隊尾操做,若是依然不存在,
* 就把當前線程做爲head結點。插入節點後,調用acquireQueued()進行阻塞
*/
private Node enq(final Node node) {
   for (;;) {
       Node t = tail;
       if (t == null) { // Must initialize
           if (compareAndSetHead(new Node()))
               tail = head;
       } else {
           node.prev = t;
           if (compareAndSetTail(t, node)) {
               t.next = node;
               return t;
           }
       }
   }
}

4.acquireQueued 是當前Node節點線程在死循環中獲取同步狀態,而只有前驅節點是頭節點才能嘗試獲取鎖,緣由是:

1.頭結點是成功獲取同步狀態(鎖)的節點,而頭節點的線程釋放了同步狀態之後,將會喚醒其後繼節點,後繼節點的線程被喚醒後要檢查本身的前驅節點是否爲頭結點。
2.維護同步隊列的FIFO原則,節點進入同步隊列以後,會嘗試自旋幾回。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        // 自旋檢查當前節點的前驅節點是否爲頭結點,才能獲取鎖
        for (;;) {
            // 獲取節點的前驅節點
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
            // 節點中的線程循環的檢查,本身的前驅節點是否爲頭節點
            // 只有當前節點 前驅節點是頭節點纔會 再次調用咱們實現的方法tryAcquire
            // 接下來無非就是將當前節點設置爲頭結點,移除以前的頭節點
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 不然檢查前一個節點的狀態,看當前獲取鎖失敗的線程是否要掛起
            if (shouldParkAfterFailedAcquire(p, node) &&
           //若是須要掛起,藉助JUC包下面的LockSupport類的靜態方法park掛起當前線程,直到被喚醒
                parkAndCheckInterrupt())
                interrupted = true; // 兩個判斷都是true說明 則置true
        }
    } finally {
        //若是等待過程當中沒有成功獲取資源(如timeout,或者可中斷的狀況下被中斷了),那麼取消結點在隊列中的等待。
        if (failed)
           //取消請求,將當前節點從隊列中移除
            cancelAcquire(node);
    }
}

若是成功就返回,不然就執行shouldParkAfterFailedAcquireparkAndCheckInterrupt來達到阻塞效果。

5.shouldParkAfterFailedAcquire 第二步的addWaiter()構造的新節點,waitStatus的默認值是0。此時,會進入最後一個if判斷,CAS設置pred.waitStatus SIGNAL,最後返回false。因爲返回false,第四步的acquireQueued會繼續進行循環。假設node的前繼節點pred仍然不是頭結點或鎖獲取失敗,則會再次進入shouldParkAfterFailedAcquire()。上一輪循環中已經將pred.waitStatu = -1了,則此次會進入第一個判斷條件,直接返回true,表示應該阻塞調用parkAndCheckInterrupt

那麼何時會遇到ws &gt; 0呢?當pred所維護的獲取請求被取消時(也就是node.waitStatus = CANCELLED,這時就會循環移除全部被取消的前繼節點pred,直到找到未被取消的pred。移除全部被取消的前繼節點後,直接返回false。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
       int ws = pred.waitStatus; // 得到前驅節點的狀態
       if (ws == Node.SIGNAL) //此處是第二次設置
           return true;
       if (ws > 0) {
          do {
               node.prev = pred = pred.prev;
           } while (pred.waitStatus > 0);
           pred.next = node;
       } else {
          //  此處是第一次設置 unsafe級別調用設置
          compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
       }
       return false;
   }

6.parkAndCheckInterrupt 主要任務是暫停當前線程而後查看是否已經暫停了。

private final boolean parkAndCheckInterrupt() {
    // 調用park()使線程進入掛起狀態,何時調用了unpark再繼續執行下面
    LockSupport.park(this); 
    // 若是被喚醒,查看本身是否是已經被中斷了。
    return Thread.interrupted();
}

7.cancelAcquireacquireQueued方法的finally會判斷 failed值,正常運行時候自旋出來的時候會是false,若是中斷或者timeout了 則會是true,執行cancelAcquire,其中核心代碼是node.waitStatus = Node.CANCELLED

8.selfInterrupt

static void selfInterrupt() {
      Thread.currentThread().interrupt();
  }

4.4.2 獨佔式釋放隊列頭節點

release()會調用tryRelease方法嘗試釋放當前線程持有的鎖,成功的話喚醒後繼線程,並返回true,不然直接返回false。

public final boolean release(long arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

1.tryRelease 這個是子類須要自我實現的,沒啥說的根據業務須要實現。
2.unparkSuccessor 喚醒頭結點的後繼節點。

private void unparkSuccessor(Node node) {
   int ws = node.waitStatus; // 得到頭節點狀態
    if (ws < 0) //若是頭節點裝小於0 則將其置爲0
        compareAndSetWaitStatus(node, ws, 0);
    Node s = node.next; //這個是新的頭節點
    if (s == null || s.waitStatus > 0) { 
    // 若是新頭節點不知足要求
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
        //從隊列尾部開始往前去找最前面的一個waitStatus小於0的節點
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)//喚醒後繼節點對應的線程
        LockSupport.unpark(s.thread);
}

4.4.3 AQS 中增長跟刪除形象圖

領統Java併發半壁江山的AQS你真的懂了嗎?

5、CountDownLatch底層

5.1 共享鎖 CountDownLatch底層

CountDownLatch 雖然相對簡單,但也實現了共享鎖模型。可是如何正確的吹逼 CountDownLatch  呢?若是在理解了上述流程的基礎上,從CountDownLatch入手來看AQS 中關於共享鎖的代碼還比較好看懂,在看的時候能夠 以看懂大體內容爲主,學習其設計的思路,不要陷入全部條件處理細節中,多線程環境中,對與錯有時候不是那麼容易看出來的。我的追源碼繪製了以下圖:

領統Java併發半壁江山的AQS你真的懂了嗎?

5.2 計數信號量Semaphore

Semaphore 這就是共享鎖的一個實現類,在初始化的時候就規定了共享鎖池的大小N,有一個線程得到了鎖,可用數就減小1個。有一個線程釋放鎖可用數就增長1個。若是有 >=2 的線程同時釋放鎖,則此時有多個鎖可用。這個時候就能夠 同時喚醒 兩個鎖setHeadAndPropagate (流程圖懶的繪製了)。

public final void acquireShared(int arg) {
     if (tryAcquireShared(arg) < 0)
         doAcquireShared(arg);
 }
private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            //找先驅結點
            final Node p = node.predecessor();
            if (p == head) {
                 // 嘗試獲取資源
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    // 設置當前結點爲頭結點,而後去喚醒後續結點。注意傳播性 喚醒!
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC  釋放頭結點,等待GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;//獲取到資源
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)//若是最後沒有獲取到資源,則cancel
            cancelAcquire(node);
    }
}

5.3 ReentrantReadWriteLock

領統Java併發半壁江山的AQS你真的懂了嗎?

在 ReentrantReadWriteLock 類中也是隻有一個32位的int state來表示讀鎖跟寫鎖,如何實現的?

1.後16位用來保存獨享的寫鎖個數,第一次得到就是01,第二次重入就是10了,這樣的方式來保存。
2.可是多個線程均可以得到讀鎖,而且每一個線程可能讀屢次,如何保存?咱們用前16位來保存有多少個線程得到了讀鎖
3.每一個讀鎖線程得到的重入讀鎖個數 由內部類HoldCounter與讀鎖配套使用。

6、Condition

synchronized 可用 wait() 和 notify()/notifyAll() 方法相結合能夠實現等待/通知模式。Lock 也提供了 Condition 來提供相似的功能。

Condition是JDK5後引入的Interface,它用來替代傳統的Object的wait()/notify()實現線程間的協做,相比使用Object的wait()/notify(),使用Conditionawait()/signal()這種方式 實現線程間協做更加安全和高效。簡單說,他的做用是使得某些線程一塊兒等待某個條件(Condition),只有當該條件具有(signal 或者 signalAll方法被調用)時,這些等待線程纔會被喚醒,從而從新爭奪鎖。wait()/notify()這些都更傾向於底層的實現開發,而Condition接口更傾向於代碼實現的等待通知效果。二者之間的區別與共通點以下:

領統Java併發半壁江山的AQS你真的懂了嗎?

6.1 條件等待隊列

條件等待隊列,指的是 Condition 內部本身維護的一個隊列,不一樣於 AQS 的 同步等待隊列。它具備如下特色:

要加入條件等待隊列的節點,不能在 同步等待隊列。

從 條件等待隊列 移除的節點,會進入同步等待隊列。

一個鎖對象只能有一個同步等待隊列,但能夠有多個條件等待隊列。

這裏以 AbstractQueuedSynchronizer 的內部類 ConditionObject 爲例(Condition 的實現類)來分析下它的具體實現過程。首先來看該類內部定義的幾個成員變量:

/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;

它採用了 AQS 的 Node 節點構造(前面說過Node類有nextWaiter屬性),並定義了兩個成員變量:firstWaiterlastWaiter 。說明在 ConditionObject 內部也維護着一個本身的單向等待隊列。目前可知它的結構以下:

領統Java併發半壁江山的AQS你真的懂了嗎?

6.2 await、signal

好比有線程 一、2競爭鎖,下面來講下具體過程 線程1:

一、線程1 調用 reentrantLock.lock時,持有鎖。

二、線程1 調用 await 方法,進入條件等待隊列 ,同時釋放鎖。

三、線程1 獲取到線程2 signal 信號,從條件等待隊列進入同步等待隊列。

線程2:

一、線程2 調用 reentrantLock.lock時,因爲鎖被線程1 持有,進入同步等待隊列 。

二、因爲線程1 釋放鎖,線程2 從同步等待隊列 移除,獲取到鎖。

三、線程2 調用 signal 方法,致使線程 1 被喚醒。線程2 調用unlock ,線程1 獲取鎖後繼續下走。

6.2.1 await

當咱們看await、signal 的源碼時候不要認爲等待隊列跟同步隊列是徹底分開的,其實我的感受底層源碼是有點 HashMap 中的紅黑樹跟雙向鏈表的意思。

當調用await方法時候,說明當前任務隊列的頭節點拿着鎖呢,此時要把該Thread從任務隊列挪到等待隊列再喚醒任務隊列最前面排隊的運行任務,如圖:

領統Java併發半壁江山的AQS你真的懂了嗎?

  1. thread 表示節點存放的線程。
  2. waitStatus 表示節點等待狀態。條件等待隊列中的節點等待狀態都是 CONDITION,不然會被清除。
  3. nextWaiter 表示後指針。

6.2.2 signal

當咱們調用signal方法的時候,咱們要將等待隊列中的頭節點移出來,讓其去搶鎖,若是是公平模式就要去排隊了,流程如圖:

領統Java併發半壁江山的AQS你真的懂了嗎?

上面只是形象流程圖,若是從代碼級別看的話大體流程以下:

領統Java併發半壁江山的AQS你真的懂了嗎?

6.2.3 signalAll

signalAll 與 signal 方法的區別體如今 doSignalAll 方法上,前面咱們已經知道doSignal方法只會對等待隊列的頭節點進行操做,doSignalAll方法只不過將等待隊列中的每個節點都移入到同步隊列中,即通知當前調用condition.await()方法的每個線程:

private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while (first != null); // 循環
}

6.3 End

一個 Condition 對象就有一個單項的等待任務隊列。在一個多線程任務中咱們能夠new出多個等待任務隊列。好比咱們new出來兩個等待隊列。

private Lock lock = new ReentrantLock();
 private Condition FirstCond = lock.newCondition();
 private Condition SecondCond = lock.newCondition();

因此真正的AQS任務中通常是一個任務隊列N個等待隊列的,所以咱們儘可能調用signal而少用signalAll,由於在指定的實例化等待隊列中只有一個能夠拿到鎖的。

Synchronized 中的 wait 跟 notify 底層代碼的等待隊列只有一個,多個線程調用wait的時候咱們是沒法知道頭節點是那個具體線程的。所以只能notifyAll

寫在最後

歡迎你們關注個人公衆號【風平浪靜如碼】,海量Java相關文章,學習資料都會在裏面更新,整理的資料也會放在裏面。

以爲寫的還不錯的就點個贊,加個關注唄!點關注,不迷路,持續更新!!!

相關文章
相關標籤/搜索