AbstractQueuedSynchronizer AQS 源碼分析之 如何應用

今天開始寫JDK開發包源碼分析的第一篇,後面每週更新一篇。先從最重要的AbstractQueuedSynchronizer (AQS 也叫隊列同步器,這名字應該是由於其內部是經過隊列實現同步狀態管理、線程排隊、等待與喚醒的) ,它是CountDownLacth、Semaphore、ReentrantLock、ReentrantReadWriteLock實現的基礎。CountDownLacth、Semaphore、ReentrantLock、ReentrantReadWriteLock 都採用組合的方式使用AQS,其內部分別有各自具體實現了AQS的Syn類。下面是對應UML圖。java

AQS是基本模板方法設計模式實現的。通常使用者根據具體須要,組合一個實現了AQS的子類,該類重寫了AQS中的相應的模板方法來知足使用者特定的同步語義。上面提到的CountDownLacth、Semaphore、ReentrantLock、ReentrantReadWriteLock都是採用這種方式實現的。 AQS提供的可重寫方法以下:設計模式

/* 獨佔模式(排它模式)獲取同步狀態,獲取同步狀態前要檢查當前線程是否能夠獲取。由於可能已被其它線程獲取,
 * 當同步狀態容許被線程屢次獲取時,獨佔模式只能被同一個線程屢次獲取。未獲取同步狀態的線程,將在同步隊列中
 * 等待。
 */
 protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException();}
/*獨佔模式釋放同步狀態,方法調用完後在同步隊列中等待獲取同步狀態的線程將有機會獲取同步狀態
 */
 protected boolean tryRelease(int arg) { throw new UnsupportedOperationException();}
/*共享模式獲取同步狀態,返回大於等於0,表示成功獲取同步狀態,反之表明失敗
 */
 protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException();}
/*共享模式釋放同步狀態
 */
 protected boolean tryReleaseShared(int arg) {throw new UnsupportedOperationException();}
/*同步器是否在獨佔模式(排它模式)下被線程佔用,通常改方法表示同步狀態是否已被當前線程獲取過
 */
 protected boolean isHeldExclusively() {throw new UnsupportedOperationException();}

默認狀況這個方法都直接拋出UnsupportedOperationException異常,使用者能夠根據狀況以獨佔或是共享的模式來完成具體的同步語義。併發

AQS內部的同步狀態是經過state表示的,state 被volatile修飾保證了不依賴了state自身的set, get方法操做的可見性。ide

/*The synchronization state.
  */
  private volatile int state;

AQS 提供了以下三個方法對應state狀態進行管理源碼分析

/*獲取當前同步狀態
*/
protected final int getState() {return state;}
/*設置當前同步狀態
*/
protected final void setState(int newState) {state = newState;}
/*使用Unsafe的CAS操做設置當前同步狀態,該方法操做能保證原子性
*/
protected final boolean compareAndSetState(int expect, int update) {
   // See below for intrinsics setup to support this
   return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

同步狀態 state的具體語義與具體使用者有關。如在Semaphore中state表示可獲取的信號量,當調用acquire方法成功獲取一次後state值將減一,用完調用release方法釋放後state的值將加一 ; 在CountDownLatch中 state表示調用await的方法的線程還要等待調用多少次countDown方法,該線程才能繼續執行; 在ReentrantLock中state表示,線程調用lock方法的次數。後面會分別分析源碼。測試

實現自定的同步組件時,將會調用AQS提供的模板方法,這些模板方法內部又將調用上面重寫的tryAcquire、 tryRelease、 tryAcquireShared 、tryReleaseShared isHeldExclusively方法。模板方法描述以下:ui

/*獨佔模式獲取同步狀態,該方法會先調用重寫的tryAcquire方法嘗試獲取同步狀態,若是當前線程成功獲取同步狀態,
 * 該方法直接返回,不然當前線程會被封裝成一個節點放入同步隊列中等待
 */
public final void acquire(int arg) {//現實省去...}
/*與acquire方法相似,但該方法能響應中斷,若是當前線程未能獲取到同步狀態,而在同步隊列中,當這個線程被中斷時,
 *則該拋出InterruptedException 異常並返回
 */
public final void acquireInterruptibly(int arg){//現實省去...}
/*在acquireInterruptibly方法上加了超時機制,若是當前線程在超過期間內沒有獲取同步狀態,那麼將返回false,
 *獲取到返回ture
 */
public final boolean tryAcquireNanos(int arg, long nanosTimeout){//現實省去...}
/*共享式獲取同步狀態,該方法會先調用重寫的tryAcquireShared方法嘗試獲取同步狀態,若是當前線程成功獲取同步狀態,
 *該方法直接返回,不然當前線程會被封裝成一個節點放入同步隊列中等待
 */
public final void acquireShared(int arg) {//現實省去...}
/*以acquireShared方法相似,在其基礎上增長了響應中斷的功能
 */
public final void acquireSharedInterruptibly(int arg){//現實省去...}
/*在acquireSharedInterruptibly方法的基礎上增長了超時限制
 */
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout){//現實省去...}
/*獨佔模式釋放同步狀態,該方法會先調用重寫的tryRelease方法,若是釋放同步狀態成功,將同步隊列中第一個節點中
 *對應線程喚醒
 */
public final boolean release(int arg) {//現實省去...}
/*共享模式釋放同步狀態,該方法會先調用重寫的tryReleaseShared方法
 */
public final boolean releaseShared(int arg) {//現實省去...}

同步器提供的模板方法可分爲二大類:獨佔模式獲取與釋放同步狀態,共享模式獲取與釋放同步狀態。this

看了這麼多,會應用解決實際問題纔是關鍵。來個看一個JDK,AQS源碼中給的示例吧。線程

import java.io.Serializable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/*
 * 互斥鎖,一次只能一個線程獲取到,同步狀態,獲取的線程不可重入
 * @author yeweigen
 */
public class Mutex implements Lock, Serializable {

    private static final Sync sync = new Sync();

    private static class Sync extends AbstractQueuedSynchronizer {

        @Override
        protected boolean isHeldExclusively() {
            if (getState() == 1) {
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryAcquire(int arg) {
           assert  arg == 1;
           /*此處使用原子操做是爲了確保併發時只有一個線程成功獲取同步狀態,
            *直接採用先getState方法後setState會有問題
            */
           if (compareAndSetState(0, 1)) {
               setExclusiveOwnerThread(Thread.currentThread());
               return true;
           }
           return false;
        }

        @Override
        protected boolean tryRelease(int arg) {
            assert arg == 1;
            if (getState() == 1) {
                setState(0);
                setExclusiveOwnerThread(null);
                return true;
            }
            return false;
        }
        Condition newCondition() { return new ConditionObject(); }
    }
    @Override
    public void lock() { sync.acquire(1); }
    @Override
    public boolean tryLock()  { return sync.tryAcquire(1); }
    @Override
    public void unlock()              { sync.release(1); }
    @Override
    public Condition newCondition()   { return sync.newCondition(); }
    @Override
    public void lockInterruptibly() throws InterruptedException {
      sync.acquireInterruptibly(1);
    }
    @Override
    public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
      return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
    public boolean isLocked()         { return sync.isHeldExclusively(); }
}

上面實現的互斥鎖Mutex,一次只能被一個線程持有。其內部類Sync經過重寫AQS的tryAcquire與tryRelease方法實現鎖獲取與釋放的語義。Mutex爲不可重入的鎖,下一篇文章會介紹何爲可重。Mutex 中的方法,則經過調用Sync實現。設計

測試代碼: 

/**
 * @author yeweigen
 */
public class MutexTest {

    public static void main (String [] args) {
        Mutex lock = new Mutex();
        new Thread(new Runnable() {
            @Override
            public void run() {
                lock.lock();
                try {
                    Date now = new Date();
                    System.out.println(" thread1 running now:" + now);
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    lock.unlock();
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                lock.lock();
                try {
                    Date now = new Date();
                    System.out.println(" thread2 running now:" + now);
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    lock.unlock();
                }
            }
        }).start();
    }
}

可能的運行結果:

能夠看到一個線程晚於另一個線程3秒後才執行,由於第一個線程執行後sleep了3秒後才釋放了以前獲取的鎖。下一篇文章將分析,AQS內部的實現原理與對應源碼。

相關文章
相關標籤/搜索