一文看懂JUC之AQS機制

爲了解決原子性的問題,Java加入了鎖機制,同時保證了可見性和順序性。JDK1.5的併發包中新增了Lock接口以及相關實現類來實現鎖功能,比synchronized更加靈活,開發者可根據實際的場景選擇相應的實現類。本文注重講解其不一樣衍生類的使用場景以及其內部AQS的原理。併發問題引入以及synchronized相關的知識請看上一篇文章一文看懂Java鎖機制java

Lock特性

可重入

像synchronized和ReentrantLock都是可重入鎖,可重入性代表了鎖的分配機制是基於線程的分配,而不是基於方法調用的分配。數組

舉個簡單的例子,當一個線程已經獲取到鎖,當後續再獲取同一個鎖,直接獲取成功。但獲取鎖和釋放鎖必需要成對出現。bash

可響應中斷

當線程由於獲取鎖而進入阻塞狀態,外部是能夠中斷該線程的,調用方經過捕獲InterruptedException能夠捕獲中斷併發

可設置超時時間

獲取鎖時,能夠指定超時時間,能夠經過返回值來判斷是否成功獲取鎖oop

公平性

提供公平性鎖和非公平鎖(默認)兩種選擇。post

  • 公平鎖,線程將按照他們發出請求的順序來獲取鎖,不容許插隊;
  • 非公平鎖,則容許插隊:當一個線程發生獲取鎖的請求的時刻,若是這個鎖是可用的,那這個線程將跳過所在隊列裏等待線程並得到鎖。

考慮這麼一種狀況:A線程持有鎖,B線程請求這個鎖,所以B線程被掛起;A線程釋放這個鎖時,B線程將被喚醒,所以再次嘗試獲取鎖;與此同時,C線程也請求獲取這個鎖,那麼C線程極可能在B線程被徹底喚醒以前得到、使用以及釋放這個鎖。這是種共贏的局面,B獲取鎖的時刻(B被喚醒後才能獲取鎖)並無推遲,C更早地獲取了鎖,而且吞吐量也得到了提升。在大多數狀況下,非公平鎖的性能要高於公平鎖的性能。性能

另外,這個公平性是針對線程而言的,不能依賴此來實現業務上的公平性,應該由開發者本身控制,好比經過FIFO隊列來保證公佈。ui

讀寫鎖

容許讀鎖和寫鎖分離,讀鎖與寫鎖互斥,可是多個讀鎖能夠共存,適用於讀頻次遠大於寫頻次的場景this

豐富的API

提供了多個方法來獲取鎖相關的信息,能夠幫助開發者監控和排查問題spa

isFair() //判斷鎖是不是公平鎖 isLocked() //判斷鎖是否被任何線程獲取了 isHeldByCurrentThread() //判斷鎖是否被當前線程獲取了 hasQueuedThreads() //判斷是否有線程在等待該鎖 getHoldCount() //查詢當前線程佔有lock鎖的次數 getQueueLength() // 獲取正在等待此鎖的線程數

鎖的使用

ReentrantLock

獨佔鎖的實現,擁有上面列舉的除讀寫鎖以外的全部特性,使用比較簡單

class X {
   // 建立獨佔鎖實例
   private final ReentrantLock lock = new ReentrantLock();
   // ...

   public void m() {
     lock.lock();  // block until condition holds
     try {
       // ... method body
     } finally {
       // 必需要釋放鎖,unlock與lock成對出現
       lock.unlock()
     }
   }
 }
複製代碼

ReentrantReadWriteLock

讀寫鎖的實現,擁有上面列舉的全部特性。而且寫鎖可降級爲讀鎖,反之不行。

class CachedData {
   Object data;
   volatile boolean cacheValid;
   final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

   void processCachedData() {
     rwl.readLock().lock();
     if (!cacheValid) {
       // Must release read lock before acquiring write lock
       rwl.readLock().unlock();
       rwl.writeLock().lock();
       try {
         // Recheck state because another thread might have
         // acquired write lock and changed state before we did.
         if (!cacheValid) {
           data = ...
           cacheValid = true;
         }
         // Downgrade by acquiring read lock before releasing write lock
         rwl.readLock().lock();
       } finally {
         rwl.writeLock().unlock(); // Unlock write, still hold read
       }
     }

     try {
       use(data);
     } finally {
       rwl.readLock().unlock();
     }
   }
 }
複製代碼

StampedLock

StampedLock也是一種讀寫鎖,提供兩種讀模式:樂觀讀和悲觀讀。樂觀讀容許讀的過程當中也能夠獲取寫鎖後寫入!這樣一來,咱們讀的數據就可能不一致,因此,須要一點額外的代碼來判斷讀的過程當中是否有寫入。

樂觀鎖的意思就是樂觀地估計讀的過程當中大機率不會有寫入,所以被稱爲樂觀鎖。反過來,悲觀鎖則是讀的過程當中拒絕有寫入,也就是寫入必須等待。顯然樂觀鎖的併發效率更高,但一旦有小几率的寫入致使讀取的數據不一致,須要能檢測出來,再讀一遍就行。

public class Point {
    private final StampedLock stampedLock = new StampedLock();

    private double x;
    private double y;

    public void move(double deltaX, double deltaY) {
        long stamp = stampedLock.writeLock(); // 獲取寫鎖
        try {
            x += deltaX;
            y += deltaY;
        } finally {
            stampedLock.unlockWrite(stamp); // 釋放寫鎖
        }
    }

    public double distanceFromOrigin() {
        long stamp = stampedLock.tryOptimisticRead(); // 得到一個樂觀讀鎖
        // 注意下面兩行代碼不是原子操做
        // 假設x,y = (100,200)
        double currentX = x;
        // 此處已讀取到x=100,但x,y可能被寫線程修改成(300,400)
        double currentY = y;
        // 此處已讀取到y,若是沒有寫入,讀取是正確的(100,200)
        // 若是有寫入,讀取是錯誤的(100,400)
        if (!stampedLock.validate(stamp)) { // 檢查樂觀讀鎖後是否有其餘寫鎖發生
            stamp = stampedLock.readLock(); // 獲取一個悲觀讀鎖
            try {
                currentX = x;
                currentY = y;
            } finally {
                stampedLock.unlockRead(stamp); // 釋放悲觀讀鎖
            }
        }
        return Math.sqrt(currentX * currentX + currentY * currentY);
    }
}
複製代碼

Condition

Condition成爲條件隊列或條件變量,爲一個線程掛起執行(等待)提供了一種方法,直到另外一線程通知某些狀態條件如今可能爲真爲止。 因爲對該共享狀態信息的訪問發生在不一樣的線程中,所以必須由互斥鎖對其其進行保護。

await方法:必須在獲取鎖以後的調用,表示釋放當前鎖,阻塞當前線程;等待其餘線程調用鎖的signal或signalAll方法,線程喚醒從新獲取鎖。

Lock配合Condition,能夠實現synchronized 與 對象(wait,notify)一樣的效果,來進行線程間基於共享變量的通訊。但優點在於同一個鎖能夠由多個條件隊列,當某個條件知足時,只須要喚醒對應的條件隊列便可,避免無效的競爭。

// 此類實現相似阻塞隊列(ArrayBlockingQueue)
class BoundedBuffer {
 final Lock lock = new ReentrantLock();
 final Condition notFull  = lock.newCondition(); 
 final Condition notEmpty = lock.newCondition(); 

 final Object[] items = new Object[100];
 int putptr, takeptr, count;

 public void put(Object x) throws InterruptedException {
   lock.lock();
   try {
     while (count == items.length)
       notFull.await();
     items[putptr] = x;
     if (++putptr == items.length) putptr = 0;
     ++count;
     notEmpty.signal();
   } finally {
     lock.unlock();
   }
 }

 public Object take() throws InterruptedException {
   lock.lock();
   try {
     while (count == 0)
       notEmpty.await();
     Object x = items[takeptr];
     if (++takeptr == items.length) takeptr = 0;
     --count;
     notFull.signal();
     return x;
   } finally {
     lock.unlock();
   }
 }
}
複製代碼

BlockingQueue

BlockingQueue阻塞隊列其實是一個生產者/消費者模型,當隊列長度大於指定的最大值,生產線程就會被阻塞;反之當隊列元素爲空時,消費線程就會被阻塞;同時當消費成功時,就會喚醒阻塞的生產者線程;生產成功就會喚醒消費者線程;

內部使用就是ReentrantLock + Condition來實現的,能夠參照上面的示例。

CountDownLatch

稱之爲倒計時器鎖,初始化指定數值,調用countDown能夠對數值減一,當數值減爲0時,就會喚醒全部由於調用await方法而阻塞的線程。

能夠達到一組線程等待另一組線程都完成任務的效果。

class Driver { // ...
   void main() throws InterruptedException {
     CountDownLatch startSignal = new CountDownLatch(1);
     CountDownLatch doneSignal = new CountDownLatch(N);

     for (int i = 0; i < N; ++i) // create and start threads
       new Thread(new Worker(startSignal, doneSignal)).start();

     doSomethingElse();            // don't let run yet startSignal.countDown(); // let all threads proceed doSomethingElse(); doneSignal.await(); // wait for all to finish } } class Worker implements Runnable { private final CountDownLatch startSignal; private final CountDownLatch doneSignal; Worker(CountDownLatch startSignal, CountDownLatch doneSignal) { this.startSignal = startSignal; this.doneSignal = doneSignal; } public void run() { try { startSignal.await(); doWork(); doneSignal.countDown(); } catch (InterruptedException ex) {} // return; } void doWork() { ... } } 複製代碼

CyclicBarrier

稱之爲同步屏障,它使得一組線程互相等待,直到到達某個公共屏障點。

初始化指定數值,調用await方法會使得線程阻塞,直到指定數量的線程都調用await方法時,全部被阻塞的線程會被喚醒,繼續執行。

與CountDownLatch的區別是,CountDownLatch是一組線程等待另一組線程,而CyclicBarrier是一組線程之間相互等待。

Semaphore

稱之爲信號量,與互斥鎖ReentrantLock用法相似,區別就是Semaphore共享的資源是多個,容許多個線程同時競爭成功。

AQS原理

AQS 是 AbstractQueuedSynchronizer的縮寫,中文 抽象隊列同步器,是構建各種鎖和同步器的基礎實現。內部維護了共享變量state (int類型) 和 雙向隊列 (包含頭指針和尾指針)

併發問題解決

原子性

Unsafe.compareAndSwapXXX 實現CAS更改 state 和 隊列指針 內部依賴CPU提供的原子指令

可見性與有序性

volatile 修飾 state 與 隊列指針 (prev/next/head/tail)

線程阻塞與喚醒

Unsafe.park Unsafe.parkNanos Unsafe.unpark

Unsafe類是在sun.misc包下,不屬於Java標準。提供了內存管理、對象實例化、數組操做、CAS操做、線程掛起與恢復等功能,Unsafe類提高了Java運行效率,加強了Java語言底層的操做能力。不少Java的基礎類庫,包括一些被普遍使用的高性能開發庫都是基於Unsafe類開發的,好比Netty、Cassandra、Hadoop、Kafka等

AQS內部有兩種模式:獨佔模式和共享模式

AQS 的設計是基於模板方法的,使用者須要繼承 AQS 並重寫指定的方法。不一樣的自定義同步器爭用共享資源的方式不一樣,好比可重入、公平性等都是子類來實現。

自定義同步器在實現時只須要實現共享資源state的獲取與釋放方式便可,至於具體線程等待隊列的維護(如獲取資源失敗入隊/喚醒出隊等),由AQS內部處理。

獨佔模式

  • 只有一個線程都可以獲取到鎖
  • 鎖釋放後須要喚醒後繼節點

AQS提供的獨佔模式相關的方法

// 獲取獨佔鎖(線程阻塞直至獲取成功)
public final void acquire(int) // 獲取獨佔鎖,可被中斷 public final void acquireInterruptibly(int) // 獲取獨佔鎖,可被中斷 和 指定超時時間 public final boolean tryAcquireNanos(int, long) // 釋放獨佔鎖(釋放鎖後,將等待隊列中第一個等待節點喚醒 ) public final boolean release(int) 複製代碼

AQS子類須要實現的獨佔模式相關的方法

// 嘗試獲取獨佔鎖
protected boolean tryAcquire(int) // 嘗試釋放獨佔鎖 protected boolean tryRelease(int) 複製代碼

獲取獨佔鎖的流程

  • 調用子類tryAcquire嘗試獲取鎖,獲取成功,直接返回
  • 經過自旋CAS將當前線程封裝成節點加入隊列末尾
  • 循環等待或嘗試tryAcquire獲取鎖
    • 判斷前置節點若是爲head,則嘗試獲取鎖
    • 根據隊列中節點狀態,決定是否須要阻塞當前線程
    • tryAcquire獲取鎖成功後,將當前節點設置爲head 並 返回
  • 若是當前線程中斷或超時,則執行cancelAcquire
    • 將當前節點狀態置爲CANCELED,並從隊列刪除
    • 若是前置節點爲Head,則將後置節點喚醒

釋放獨佔鎖的流程

共享模式

  • 多個線程都可以獲取到鎖
  • 鎖釋放後須要喚醒後繼節點
  • 鎖獲取後若是還有資源須要喚醒後繼共享節點

AQS提供的共享模式相關的方法

// 獲取共享鎖(線程阻塞直至獲取成功)
public final void acquireShared(int) // 獲取共享鎖,可被中斷 public final acquireSharedInterruptibly(int) // 獲取共享鎖,可被中斷 和 指定超時時間 public final tryAcquireSharedNanos(int, long) // 獲取共享鎖 public final boolean releaseShared(int) 複製代碼

AQS子類須要實現的共享模式相關的方法

// 嘗試獲取共享鎖
protected int tryAcquireShared(int) // 嘗試釋放共享鎖 protected boolean tryReleaseShared(int) 複製代碼

獲取共享鎖的流程

  • 調用子類tryAcquireShared嘗試獲取鎖,獲取成功,直接返回
  • 經過自旋CAS將當前線程封裝成節點加入隊列末尾
  • 循環等待或嘗試tryAcquireShared獲取鎖
    • 判斷前置節點若是爲head,則嘗試獲取鎖
    • 根據隊列中節點狀態,決定是否須要阻塞當前線程
    • tryAcquireShared獲取鎖成功後,將當前節點設置爲head
      • 若是資源有剩餘或者原先的head節點狀態爲SIGNAL/PROPAGATE,則調用doReleaseShared
      • 若是當前head節點狀態爲SIGNAL,喚醒後繼節點
      • 若是當前head節點狀態爲ZERO,將head節點狀態置爲PROPAGATE
  • 若是當前線程中斷或超時,則執行cancelAcquire
    • 將當前節點狀態置爲CANCELED,並從隊列刪除
    • 若是前置節點爲Head,則將後置節點喚醒

釋放共享鎖的流程

等待隊列中節點的狀態變化

ReentrantLock示例

tryAcquire邏輯

tryRelease邏輯

相關文章
相關標籤/搜索