聊聊併發(十三)—AQS框架深刻分析

#0 系列目錄#java

#1 什麼是同步器# 多線程併發的執行,之間經過某種 共享 狀態來同步,只有當狀態知足 xxxx 條件,才能觸發線程執行 xxxx 。這個共同的語義能夠稱之爲同步器。能夠認爲全部的鎖機制均可以基於同步器定製來實現的node

而juc(java.util.concurrent)裏的思想是 將這些場景抽象出來的語義經過統一的同步框架來支持。juc 裏全部的這些鎖機制都是基於 AQS ( AbstractQueuedSynchronizer )框架上構建的。下面簡單介紹下 AQS( AbstractQueuedSynchronizer )。windows

咱們來看下java.util.concurrent.locks大體結構:多線程

輸入圖片說明

上圖中,LOCK的實現類其實都是構建在AbstractQueuedSynchronizer上,爲什麼圖中沒有用UML線表示呢,這是每一個Lock實現類都持有本身內部類Sync的實例,而這個Sync就是繼承AbstractQueuedSynchronizer(AQS)。爲什麼要實現不一樣的Sync呢?這和每種Lock用途相關。另外還有AQS的State機制。下文會舉例說明不一樣同步器內的Sync與state實現。併發

#2 AQS框架如何構建同步器# ##2.1 同步器的基本功能## 一個同步器至少須要包含兩個功能:框架

  1. 獲取同步狀態:若是容許,則獲取鎖,若是不容許就阻塞線程,直到同步狀態容許獲取。
  2. 釋放同步狀態:修改同步狀態,而且喚醒等待線程。

根據做者論文, aqs 同步機制同時考慮了以下需求:ui

  1. 獨佔鎖和共享鎖兩種機制
  2. 線程阻塞後,若是須要取消,須要支持中斷
  3. 線程阻塞後,若是有超時要求,應該支持超時後中斷的機制

##2.2 同步狀態的獲取與釋放## AQS實現了一個同步器的基本結構,下面以獨佔鎖與共享鎖分開討論,來講明AQS怎樣實現獲取、釋放同步狀態。this

###2.2.1 獨佔模式### 獨佔獲取: tryAcquire 自己不會阻塞線程,若是返回 true 成功就繼續,若是返回 false 那麼就阻塞線程並加入阻塞隊列。操作系統

public final void acquire(int arg) {  
  
        if (!tryAcquire(arg) &&  
  
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//獲取失敗,則加入等待隊列  
  
            selfInterrupt();  
  
    }

獨佔且可中斷模式獲取:支持中斷取消.net

public final void acquireInterruptibly(int arg) throws InterruptedException {  
  
        if (Thread.interrupted())  
            throw new InterruptedException();  
        if (!tryAcquire(arg))  
            doAcquireInterruptibly(arg);  

    }

獨佔且支持超時模式獲取: 帶有超時時間,若是通過超時時間則會退出。

public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {  
  
        if (Thread.interrupted())  
  
            throw new InterruptedException();  
  
        return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout);  
    }

獨佔模式釋放:釋放成功會喚醒後續節點

public final boolean release(int arg) {  
        if (tryRelease(arg)) {  
            Node h = head;  
            if (h != null && h.waitStatus != 0)  
                unparkSuccessor(h);  
            return true;  
        }  
        return false;  
    }

###2.2.2 共享模式### 共享模式獲取

public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);  
    }

可中斷模式共享獲取

public final void acquireSharedInterruptibly(int arg) throws InterruptedException {  
        if (Thread.interrupted())  
            throw new InterruptedException();  
        if (tryAcquireShared(arg) < 0)  
            doAcquireSharedInterruptibly(arg);  
    }

共享模式帶定時獲取

public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {  
        if (Thread.interrupted())  
            throw new InterruptedException();  
        return tryAcquireShared(arg) >= 0 ||  
            doAcquireSharedNanos(arg, nanosTimeout);  
    }

共享鎖釋放

public final boolean releaseShared(int arg) {  
        if (tryReleaseShared(arg)) {  
            doReleaseShared();  
            return true;  
        }  
        return false;  
    }

注意以上框架只定義了一個同步器的基本結構框架,基本方法裏依賴的 tryAcquire 、 tryRelease 、tryAcquireShared 、 tryReleaseShared 四個方法在 AQS 裏沒有實現,這四個方法不會涉及線程阻塞,而是由各自不一樣的使用場景根據狀況來定製:

protected boolean tryAcquire(int arg) {  
        throw new UnsupportedOperationException();
    }  
    protected boolean tryRelease(int arg) {  
        throw new UnsupportedOperationException();
    }  
    protected int tryAcquireShared(int arg) {  
        throw new UnsupportedOperationException();
    }  
    protected boolean tryReleaseShared(int arg) {  
        throw new UnsupportedOperationException();
    }

從以上源碼能夠看出AQS實現基本的功能: AQS雖然實現了acquire,和release方法是可能阻塞的,可是裏面調用的tryAcquire和tryRelease是由子類來定製的且是不阻塞的。能夠認爲同步狀態的維護、獲取、釋放動做是由子類實現的功能,而動做成功與否的後續行爲時有AQS框架來實現

##2.3 狀態獲取、釋放成功或失敗的後續行爲:線程的阻塞、喚醒機制## 有別於wait和notiry。這裏利用 jdk1.5 開始提供的 LockSupport.park() 和 LockSupport.unpark() 的本地方法實現,實現線程的阻塞和喚醒

獲得鎖的線程禁用(park)和喚醒(unpark),也是直接native實現(這幾個native方法的實現代碼在hotspot\src\share\vm\prims\unsafe.cpp文件中,可是關鍵代碼park的最終實現是和操做系統相關的,好比windows下實現是在os_windows.cpp中,有興趣的同窗能夠下載jdk源碼查看)。喚醒一個被park()線程主要手段包括如下幾種:

  1. 其餘線程調用以被park()線程爲參數的unpark(Thread thread)。
  2. 其餘線程中斷被park()線程,如waiters.peek().interrupt();waiters爲存儲線程對象的隊列。
  3. 不知緣由的返回。

park()方法返回並不會報告究竟是上訴哪一種返回,因此返回後最好檢查下線程狀態,如:

LockSupport.park();  // 禁用當前線程
    if(Thread.interrupted){
        //doSomething
    }

AbstractQueuedSynchronizer(AQS)對於這點實現得至關巧妙,以下所示:

private void doAcquireSharedInterruptibly(int arg)throwsInterruptedException {  
        final Node node = addWaiter(Node.SHARED);  
        try {  
            for (;;) {  
                final Node p = node.predecessor();  
                if (p == head) {  
                    int r = tryAcquireShared(arg);  
                    if (r >= 0) {  
                        setHeadAndPropagate(node, r);  
                        p.next = null; // help GC  
                        return;  
                    }  
                }  
                //parkAndCheckInterrupt()會返回park住的線程在被unpark後的線程狀態,若是線程中斷,跳出循環。  
                if (shouldParkAfterFailedAcquire(p, node) &&  
                    parkAndCheckInterrupt())  
                    break;  
             }  
         } catch (RuntimeException ex) {  
             cancelAcquire(node);  
             throw ex;  
         }  
  
         // 只有線程被interrupt後纔會走到這裏  
         cancelAcquire(node);  
         throw new InterruptedException();  
    }  
  
    //在park()住的線程被unpark()後,第一時間返回當前線程是否被打斷  
    private final boolean parkAndCheckInterrupt() {  
        LockSupport.park(this);  
        return Thread.interrupted();  
    }

##2.4 線程阻塞隊列的維護## 阻塞線程節點隊列 CHL Node queue 。

根據論文裏描述, AQS 裏將阻塞線程封裝到一個內部類 Node 裏。並維護一個 CHL Node FIFO 隊列CHL隊列是一個非阻塞的 FIFO 隊列,也就是說往裏面插入或移除一個節點的時候,在併發條件下不會阻塞,而是經過自旋鎖和 CAS 保證節點插入和移除的原子性。實現無鎖且快速的插入。CHL隊列對應代碼以下:

/** 
     * CHL頭節點 
     */   
    private transient volatile Node head;
    /** 
     * CHL尾節點 
     */  
    private transient volatile Node tail;

Node節點是對Thread的一個封裝,結構大概以下:

static final class Node {  
        /** 表明線程已經被取消*/  
        static final int CANCELLED =  1;  
        /** 表明後續節點須要喚醒 */  
        static final int SIGNAL    = -1;  
        /** 表明線程在等待某一條件/ 
        static final int CONDITION = -2; 
        /** 標記是共享模式*/  
        static final Node SHARED = new Node();  
        /** 標記是獨佔模式*/  
        static final Node EXCLUSIVE = null;  
  
        /** 
         * 狀態位 ,分別可使CANCELLED、SINGNAL、CONDITION、0 
         */  
        volatile int waitStatus;  
  
        /** 
         * 前置節點 
         */  
        volatile Node prev;  
  
        /** 
         * 後續節點 
         */  
        volatile Node next;  
  
        /** 
         * 節點表明的線程 
         */  
        volatile Thread thread;  
  
        /** 
         *鏈接到等待condition的下一個節點 
         */  
        Node nextWaiter;
    }

##2.5 小結## 從源碼能夠看出AQS實現基本的功能:

  1. 同步器基本範式、結構
  2. 線程的阻塞、喚醒機制
  3. 線程阻塞隊列的維護

AQS雖然實現了acquire,和release方法,可是裏面調用的tryAcquire和tryRelease是由子類來定製的。能夠認爲同步狀態的維護、獲取、釋放動做是由子類實現的功能,而動做成功與否的後續行爲時有AQS框架來實現,還有如下一些私有方法,用於輔助完成以上的功能:

final boolean acquireQueued(final Node node, int arg) :申請隊列
private Node enq(final Node node) : 入隊
private Node addWaiter(Node mode) :以mode建立建立節點,並加入到隊列
private void unparkSuccessor(Node node) : 喚醒節點的後續節點,若是存在的話。
private void doReleaseShared() :釋放共享鎖
private void setHeadAndPropagate(Node node, int propagate):設置頭,而且若是是共享模式且propagate大於0,則喚醒後續節點。
private void cancelAcquire(Node node) :取消正在獲取的節點
private static void selfInterrupt() :自我中斷
private final boolean parkAndCheckInterrupt() : park 並判斷線程是否中斷

#3 AQS在各同步器內的Sync與State實現# ##3.1 什麼是state機制## 提供 volatile 變量 state; 用於同步線程之間的共享狀態。經過 CAS 和 volatile 保證其原子性和可見性。對應源碼裏的定義:

/** 
 * 同步狀態 
 */  
private volatile int state;  
  
/** 
 *cas 
 */  
protected final boolean compareAndSetState(int expect, int update) {  
    // See below for intrinsics setup to support this  
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);  
}

##3.2 不一樣實現類的Sync與State## 基於AQS構建的Synchronizer包括ReentrantLock,Semaphore,CountDownLatch, ReetrantRead WriteLock,FutureTask等,這些Synchronizer實際上最基本的東西就是原子狀態的獲取和釋放,只是條件不同而已

###3.2.1 ReentrantLock### 須要記錄當前線程獲取原子狀態的次數,若是次數爲零,那麼就說明這個線程放棄了鎖(也有可能其餘線程佔據着鎖從而須要等待),若是次數大於1,也就是得到了重進入的效果,而其餘線程只能被park住,直到這個線程重進入鎖次數變成0而釋放原子狀態。如下爲ReetranLock的FairSync的tryAcquire實現代碼解析。

//公平獲取鎖
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    //若是當前重進入數爲0,說明有機會取得鎖
    if (c == 0) {
        //若是是第一個等待者,而且設置重進入數成功,那麼當前線程得到鎖
        if (isFirst(current) &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //若是當前線程自己就持有鎖,那麼疊加劇進入數,而且繼續得到鎖
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    //以上條件都不知足,那麼線程進入等待隊列。
    return false;
}

###3.2.2 Semaphore### 則是要記錄當前還有多少次許可可使用,到0,就須要等待,也就實現併發量的控制,Semaphore一開始設置許可數爲1,實際上就是一把互斥鎖。如下爲Semaphore的FairSync實現:

protected int tryAcquireShared(int acquires) {
    Thread current = Thread.currentThread();
    for (;;) {
         Thread first = getFirstQueuedThread();
         //若是當前等待隊列的第一個線程不是當前線程,那麼就返回-1表示當前線程須要等待
         if (first != null && first != current)
              return -1;
         //若是當前隊列沒有等待者,或者當前線程就是等待隊列第一個等待者,那麼先取得semaphore還有幾個許可證,而且減去當前線程須要的許可證獲得剩下的值
         int available = getState();
         int remaining = available - acquires;
         //若是remining<0,那麼反饋給AQS當前線程須要等待,若是remaining>0,而且設置availble成功設置成剩餘數,那麼返回剩餘值(>0),也就告知AQS當前線程拿到許可,能夠繼續執行。
         if (remaining < 0 ||compareAndSetState(available, remaining))
             return remaining;
   }
}

###3.2.3 CountDownLatch### 閉鎖則要保持其狀態,在這個狀態到達終止態以前,全部線程都會被park住,閉鎖能夠設定初始值,這個值的含義就是這個閉鎖須要被countDown()幾回,由於每次CountDown是sync.releaseShared(1),而一開始初始值爲10的話,那麼這個閉鎖須要被countDown()十次,纔可以將這個初始值減到0,從而釋放原子狀態,讓等待的全部線程經過。

//await時候執行,只查看當前須要countDown數量減爲0了,若是爲0,說明能夠繼續執行,不然須要park住,等待countDown次數足夠,而且unpark全部等待線程  
public int tryAcquireShared(int acquires) {  
     return getState() == 0? 1 : -1;  
}  
  
//countDown 時候執行,若是當前countDown數量爲0,說明沒有線程await,直接返回false而不須要喚醒park住線程,若是不爲0,獲得剩下須要 countDown的數量而且compareAndSet,最終返回剩下的countDown數量是否爲0,供AQS斷定是否釋放全部await線程。  
public boolean tryReleaseShared(int releases) {  
    for (;;) {  
         int c = getState();  
         if (c == 0)  
             return false;  
         int nextc = c-1;  
         if (compareAndSetState(c, nextc))  
             return nextc == 0;  
    }
}

###3.2.4 FutureTask### 須要記錄任務的執行狀態,當調用其實例的get方法時,內部類Sync會去調用AQS的acquireSharedInterruptibly()方法,而這個方法會反向調用Sync實現的tryAcquireShared()方法,即讓具體實現類決定是否讓當前線程繼續仍是park,而FutureTask的tryAcquireShared方法所作的惟一事情就是檢查狀態,若是是RUNNING狀態那麼讓當前線程park。而跑任務的線程會在任務結束時調用FutureTask 實例的set方法(與等待線程持相同的實例),設定執行結果,而且經過unpark喚醒正在等待的線程,返回結果

//get時待用,只檢查當前任務是否完成或者被Cancel,若是未完成而且沒有被cancel,那麼告訴AQS當前線程須要進入等待隊列而且park住
protected int tryAcquireShared(int ignore) {
     return innerIsDone()? 1 : -1;
}

//斷定任務是否完成或者被Cancel
boolean innerIsDone() {
    return ranOrCancelled(getState()) &&    runner == null;
}

//get時調用,對於CANCEL與其餘異常進行拋錯
V innerGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException {
    if (!tryAcquireSharedNanos(0,nanosTimeout))
        throw new TimeoutException();
    if (getState() == CANCELLED)
        throw new CancellationException();
    if (exception != null)
        throw new ExecutionException(exception);
    return result;
}

//任務的執行線程執行完畢調用(set(V v))
void innerSet(V v) {
     for (;;) {
        int s = getState();
        //若是線程任務已經執行完畢,那麼直接返回(多線程執行任務?)
        if (s == RAN)
            return;
        //若是被CANCEL了,那麼釋放等待線程,而且會拋錯
        if (s == CANCELLED) {
            releaseShared(0);
            return;
        }
        //若是成功設定任務狀態爲已完成,那麼設定結果,unpark等待線程(調用get()方法而阻塞的線程),以及後續清理工做(通常由FutrueTask的子類實現)
        if (compareAndSetState(s, RAN)) {
            result = v;
            releaseShared(0);
            done();
            return;
        }
    }
}

以上4個AQS的使用是比較典型,然而有個問題就是這些狀態存在哪裏呢?而且是能夠計數的。從以上4個example,咱們能夠很快獲得答案,AQS提供給了子類一個int state屬性。而且暴露給子類getState()和setState()兩個方法(protected)。這樣就爲上述狀態解決了存儲問題,RetrantLock能夠將這個state用於存儲當前線程的重進入次數Semaphore能夠用這個state存儲許可數CountDownLatch則能夠存儲須要被countDown的次數,而Future則能夠存儲當前任務的執行狀態(RUNING,RAN,CANCELL)。其餘的Synchronizer存儲他們的一些狀態。

AQS留給實現者的方法主要有5個方法,其中tryAcquire,tryRelease和isHeldExclusively三個方法爲須要獨佔形式獲取的synchronizer實現的,好比線程獨佔ReetranLock的Sync,而tryAcquireShared和tryReleasedShared爲須要共享形式獲取的synchronizer實現

ReentrantLock內部Sync類實現的是tryAcquire,tryRelease, isHeldExclusively三個方法(由於獲取鎖的公平性問題,tryAcquire由繼承該Sync類的內部類FairSync和NonfairSync實現);Semaphore內部類Sync則實現了tryAcquireShared和tryReleasedShared(與CountDownLatch類似,由於公平性問題,tryAcquireShared由其內部類FairSync和NonfairSync實現)。CountDownLatch內部類Sync實現了tryAcquireShared和tryReleasedSharedFutureTask內部類Sync也實現了tryAcquireShared和tryReleasedShared

相關文章
相關標籤/搜索