AbstractQueuedSynchronizer

隊列同步器概述

隊列同步器AbstractQueuedSynchronizer(如下簡稱同步器),是用來構建鎖或者其餘同步組件的基礎框架,它使用了一個int成員變量表示同步狀態,經過內置的FIFO隊列來完成資源獲取線程的排隊工做,併發包的做者(Doug Lea)指望它可以成爲實現大部分同步需求的基礎。java

同步器是實現鎖(也能夠是任意同步組件)的關鍵,在鎖的實現中聚合同步器,利用同步器實現鎖的語義。能夠這樣理解兩者之間的關係:鎖是面向使用者的,它定義了使用者與鎖交互的接口(好比能夠容許兩個線程並行訪問),隱藏了實現細節;同步器面向的是鎖的實現者,它簡化了鎖的實現方式,屏蔽了同步狀態關係、線程的排隊、等待與喚醒等底層操做node

隊列同步器的接口與示例

同步器的設計是基於模板方法模式的,也就是說,使用者須要繼承同步器並重寫指定的方法,隨後將同步器組合在自定義同步組件的實現中,並調用同步器提供的模板方法,而這些模板方法將會調用使用者重寫方法。編程

重寫同步器指定的方法時,須要使用同步器提供的以下3個方法來訪問或修改同步狀態。安全

  • getState():獲取當前同步狀態,這個操做具備volatile讀取的內存語義。併發

  • setState(int newState):設置當前同步狀態,這個操做具備volatile寫入的內存語義。框架

  • compareAndSetState(int expect, int update):使用CAS設置當前狀態,該方法可以保證狀態設置的原子性,具備volatile讀取和寫入的內存語義。ui

同步器可重寫的方法與描述如表所示:線程

方法名稱 描述
protected boolean tryAcquire(int arg) 獨佔鎖獲取同步狀態,實現該方法須要查詢當前狀態並判斷同步狀態是否符合預期,而後再進行CAS設置同步狀態
protected boolean tryRelease(int arg) 獨佔式釋放同步狀態,等待獲取同步狀態的線程將有機會獲取同步狀態
protected int tryAcquireShared(int arg) 共享式獲取同步狀態,返回大於等於0的值,表示獲取成功,反之,獲取失敗
protected boolean tryReleaseShare(int arg) 共享式釋放同步狀態
protected boolean isHeldExclusively 當前同步器是否在獨佔模式下被線程佔用,通常該方法表示是否被當前線程所獨佔

實現自定義同步組件時,將會調用同步器提供的模板方法,這些(部分)模板方法與描述如表所示:設計

方法名稱 描述
void acuire(int arg) 獨佔式獲取同步狀態,若是當前線程獲取同步狀態成功,則由該方法返回,不然,將會進入同步隊列等待,該方法將會調用重寫的tryAcquire(int arg)方法
void acquireInterruptibly(int arg) 與acquire(int arg)相同,可是該方法響應中斷,當前線程未獲取到同步狀態而進入同步隊列中,若是當前線程被中斷,則該方法會拋出InterruptedException並返回
boolean tryAcquireNanos(int arg,long nanos) 在acquireInterruptibly(int arg)基礎上增長了超時限制,若是當前線程在超時時間內沒有獲取到同步狀態,那麼將會返回false,若是獲取到了返回true
void acquireShared(int arg) 共享式的獲取同步狀態,若是當前線程未獲取到同步狀態,將會進入同步隊列等待,與獨佔式獲取的主要區別是在同一時刻能夠有多個線程獲取到同步狀態
void acquireSharedInterruptibly(int arg) 與acquireShared(int arg)相同,該方法響應中斷
boolean tryAcquireSharedNanos(int arg,long nanos) 在acquireSharedInterruptibly(int arg)基礎上增長了超時限制
boolean release(int arg) 獨佔式的釋放同步狀態,該方法會在釋放同步狀態以後,將同步隊列中第一個節點包含的線程喚醒
boolean releaseShared(int arg) 共享式的釋放同步狀態
Collection getQueueThreads() 獲取等待在同步隊列上的線程集合

同步器提供的模板方法基本上分爲3類:獨佔式獲取與釋放同步狀態共享式獲取與釋放同步狀態查詢同步隊列中的等待線程狀況。自定義同步組件將使用同步器提供的模板方法來實現本身的同步語義。3d

只有掌握了同步器的工做原理才能更加深刻地理解併發包中其餘地併發組件,因此下面經過一個獨佔鎖的示例來深刻了解一下同步器的工做原理。

示例:獨佔鎖

顧名思義,獨佔鎖就是在同一時刻只能有一個線程獲取到鎖,而其餘獲取鎖的線程只能處於同步隊列中等待,只有獲取鎖的線程釋放了鎖,後繼的線程纔可以獲取鎖:

class Mutex implements Lock{
    //靜態內部類,自定義同步器
    private static class Sync extends AbstractQueuedSynchronizer{
        //是否處於佔用狀態
        protected boolean isHeldExclusively(){
            return getState() == 1;
        }
        //當狀態爲0的時候獲取鎖
        public boolean tryAcquire(int acquires){
            if(compareAndSetState(0,1)){
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        //釋放鎖,將狀態設置爲0
        protected boolean tryRelease(int releases){
            if(getState() == 0) throw new 
                IllegalMonitorStateException();
            setExcelusiveOwnerThread(null);
            setState(0);
            return true;
        }
        //返回一個Condition,每一個codition都包含了一個codition隊列
        Codition newConditon() {return new CoditionObject();}
    }
    //僅須要將操做代理到Sync上便可
    private final Sync sync = new Sync();
    public void lock(){sync.acuire(1);}
    public boolean tryLock() {return sync.tryAcquire(1);}
    public void unlock(){sync.release(1);}
    public Condition newCodition(){return sysc.newCondition();}
    public boolean isLocked() {return sync.isHeldExclusively();}
    public boolean hasQueuedThreads(){return sync.hasQueuedThreads();}
    public void lockInterruptibly() throw InterruptedException{
        sync.acqurieInterruptibly(1);
    }
    public boolean tryLock(long timeout,TimeUnit unit) throws InterruptedException{
        return sync.tryAcquireNanos(1,unit.toNanos(timeout));
    }
}

上述示例中,獨佔鎖Mutex是一個自定義同步組件,它在同一時刻只容許一個線程佔用鎖。Mutex中定義了一個靜態內部類,該內部類繼承了同步器並實現了獨佔式獲取和釋放同步狀態。在tryAcquire(int acquires)方法中,若是通過CAS設置成功(同步狀態設置爲1),則表明獲取了同步狀態,而在tryRelease(int releases)方法中只是將同步狀態重置爲0。

隊列同步器的實現分析

同步隊列

同步器依賴內部的同步隊列(一個FIFO雙向隊列)來完成同步狀態的管理,當前線程獲取同步狀態失敗時,同步器會將當前線程以及等待狀態等信息構形成爲一個節點(Node)並將其加入同步隊列,同時會阻塞當前線程,當同步狀態釋放時,會把首節點中的線程喚醒,使其再次嘗試獲取同步狀態。

同步隊列中的節點(Node)用來保存獲取同步狀態失敗的線程引用、等待狀態以及前驅和後繼節點,節點的屬性類型與名稱以及描述如表所示:

方法名稱 描述
volatile int waitStatus; 等待狀態。 包含以下狀態。 1. CANCELLED,值爲1,因爲在同步隊列中等待的線程等待超市或者被中斷,須要從同步隊列中取消等待,節點進入該狀態將不會變化 2. SINGAL,值爲-1,後繼節點的線程處於等待狀態,而當前節點的線程若是釋放了同步狀態或者被取消,將會通知後繼節點,使後繼節點的線程得以運行 3. CONDITION,值爲-2,節點在等待隊列中,節點線程等待在Condition上,當其餘線程對Condition調用了signal()方法後,該節點將會從等待隊列中轉移到同步隊列中,加入到對同步狀態的獲取中 4. PROPAGATE,值爲-3,表示下一次共享式同步狀態獲取將會無條件地被傳播下去 5. INITIAL,值爲0,初始狀態
volatile Node prev; 前驅節點,當節點加入同步隊列時被設置(尾部設置)
volatile Node next; 後繼節點
Node nextWaiter; 等待隊列中的後繼節點。若是當前節點是共享的,那麼這個字段將是一個SHARED常量,也就是說節點類型(獨佔和共享)和等待隊列中的後繼節點共用同一個字段
volatile Thread thread; 獲取同步狀態的線程

節點是構成同步隊列的基礎,同步器擁有首節點(head)和尾節點(tail),沒有成功獲取同步狀態和線程將會成爲節點加入該隊列的尾部,同步隊列的基本結構如圖所示:

當一個線程成功地獲取了同步狀態(或者鎖),其餘線程將沒法獲取到同步狀態,轉而被構形成爲節點並加入到同步隊列中,而這個加入隊列地過程必需要保證線程安全,所以同步器提供了一個基於CAS地設置尾節點地方法:compareAndSetTail(Node expect,Node update),它須要傳遞當前線程「認爲」的尾節點和當前節點,只有設置成功後,當前節點才正式與以前的尾節點創建關聯。

同步器將節點加入到同步隊列的過程如圖所示:

同步隊列遵循FIFO,首節點是獲取同步狀態成功的節點,首節點的線程在釋放同步狀態時,將會喚醒後繼節點,然後繼節點將會在獲取同步狀態成功時將本身設置爲首節點,過程如圖所示:

上圖中,設置首節點是經過獲取同步狀態成功的線程來完成的,因爲只有一個線程可以成功獲取到同步狀態,所以設置頭節點的方法並不須要使用CAS來保證,它只須要將首節點設置成爲原首節點的後繼節點並斷開原首節點的next引用便可。

獨佔式同步狀態獲取與釋放

經過調用同步器的acquire(int arg)方法能夠獲取同步狀態,該方法對中斷不敏感,也就是因爲線程獲取同步狀態失敗後進入同步隊列中,後繼對線程進行中斷操做時,線程不會從同步隊列中移出,該方法代碼以下:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

其主要邏輯是:首先調用自定義同步器實現的tryAcquire(int arg)方法,該方法保證線程安全的獲取同步狀態,若是同步狀態獲取失敗,則構造同步節點(獨佔式Node.EXCLUSIVE,同一時刻只能有一個線程成功獲取同步狀態)並經過addWaiter(Node node)方法將該節點加入到同步隊列的尾部,最後調用acquireQueued(Node node,int arg)方法,使得該節點以「死循環」的方式獲取同步狀態。若是獲取不到則阻塞節點中的線程,而被阻塞線程的喚醒主要依靠前驅節點的出隊或阻塞現場被中斷來實現。

下面分析一個相關工做。首先是節點的構造以及加入同步隊列,如代碼所示。

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // 快速嘗試在尾部添加
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}
private Node enq(final Node node) {
    for (;;) {
       Node t = tail;
       if (t == null) { // Must initialize
           if (compareAndSetHead(new Node()))
               tail = head;
           } else {
           node.prev = t;
           if (compareAndSetTail(t, node)) {
              t.next = node;
              return t;
            }
        }
    }
}

上述代碼經過使用compareAndSetTail(Node expect,Node update)方法來確保節點可以被線程安全的添加。若是使用一個普通的LinkedList來維護節點之間的關係,那麼當一個線程獲取了同步狀態,而其餘多個線程因爲調用tryAcquire(int arg)方法獲取同步狀態失敗而併發地被添加到LinkedList時,LinkedList將難以保證Node的正確添加,最終的結果多是節點的數量有誤差,並且順序也是混亂的。

enq(final Node node)方法中,同步器經過「死循環」來保證節點的正確添加,在「死循環」中只有經過CAS將節點設置成爲尾節點以後,當前線程才能從該方法返回,不然,當前線程不斷地嘗試設置。能夠看出,enq(final Node node)方法將併發節點的請求經過CAS變得「串行化」了。

節點進入同步隊列以後,就進入了一個自旋的過程,每一個節點(或者說每一個線程)都在自省地觀察,當條件知足,獲取到了同步狀態,就能夠從這個自選過程當中退出,不然依舊留在這個自旋過程當中(並會阻塞節點地線程),如代碼所示:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
               setHead(node);
               p.next = null; // help GC
               failed = false;
               return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
               parkAndCheckInterrupt())
               interrupted = true;
            }
     } finally {
        if (failed)
            cancelAcquire(node);
        }
}

acquireQueued(final Node node,int arg)方法中,當前線程在「死循環」中嘗試獲取同步狀態,而只有前驅結點是頭節點纔可以獲取同步狀態,這是爲何?緣由有兩個,以下。

第一,頭節點是成功獲取到同步狀態的節點,而頭節點的線程釋放了同步狀態以後,將會喚醒其後繼節點,後繼節點的線程被喚醒後須要檢查本身的前驅節點是不是頭節點。

第二,維護同步隊列的FIFIO原則。該方法中,節點自旋獲取同步狀態的行爲如圖所示:

上圖中,因爲非首節點線程前驅節點出隊或者被中斷而從等待狀態返回,隨後檢查本身的前驅是不是頭節點,若是是則嘗試獲取同步狀態。能夠看到節點和節點之間在循環檢查的過程當中基本不相互通訊,而是簡單地判斷本身的前驅是否爲頭節點,這樣就使得節點的釋放規則符合FIFO,而且也便於對過早通知的處理(過早通知是指前驅節點不是頭節點的線程因爲中斷而被喚醒)。

獨佔式同步狀態獲取流程,也就是acquire(int arg)方法調用線程,如圖所示:

在上圖中,前驅節點爲頭節點且可以獲取同步狀態的判斷條件和線程進入等待狀態是獲取同步狀態的自選過程。當同步狀態獲取成功以後,當前線程從acqurie(int arg)方法返回,若是對於鎖這種併發組件而言,表明着當前線程獲取了鎖。

當前線程獲取同步狀態並執行了相應邏輯以後,就須要釋放同步狀態,使得後續節點可以繼續獲取同步狀態。經過調用同步器的release(int arg)方法能夠釋放同步狀態,該方法在釋放了同步狀態以後,會喚醒其後繼節點(進而使後繼節點從新嘗試獲取同步狀態)。該方法代碼以下所示:

public final boolean release(int arg){
    if(tryRelease(arg)){
        Node h = head;
        if(h != null && h.waitStatus != 0){
            unparkSuccessor(h);
        }
        return true;
    }
    return false;
}

該方法執行時,會喚醒頭節點的後繼節點線程,uparkSuccessor(Node node)方法使用LockSupport來喚醒處於等待狀態的線程。

作個總結:在獲取同步狀態時,同步器維護一個同步隊列,獲取狀態失敗的線程都會被加入到隊列中並在隊列中進行自旋;移出隊列(或中止自旋)的條件是前驅節點爲頭節點且成爲獲取了同步狀態。在釋放同步狀態時,同步器調用tryRelease(int arg)方法釋放同步狀態,而後喚醒頭節點的後繼節點。

共享同步狀態獲取與釋放

共享式獲取與獨佔式獲取最主要的區別在於同一時刻可否有多個線程同時獲取到同步狀態。

上圖中,左半部分,共享式訪問資源時,其餘共享式的訪問均被容許,而獨佔式訪問被阻塞,右半部分是獨佔式訪問資源時,同一時刻其餘訪問均被阻塞。

經過調用同步器的acquireShared(int arg)方法能夠共享式地獲取同步狀態,該方法代碼以下:

public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
               int r = tryAcquireShared(arg);
               if (r >= 0) {
                  setHeadAndPropagate(node, r);
                  p.next = null; // help GC
                  if (interrupted)
                      selfInterrupt();
                      failed = false;
                      return;
                   }
             }
             if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
             }
   } finally {
         if (failed)
             cancelAcquire(node);
   }
}

在acquireShared(int arg)方法中,同步器調用tryAcquireShared(int arg)方法嘗試獲取同步狀態,tryAcquireShared(int arg)方法返回值爲int類型,當返回值大於等於0時,表示可以獲取到同步狀態。所以,在共享式獲取地自旋過程當中,成功獲取到同步狀態並退出自旋地條件是tryAcquireShared(int arg)方法返回值大於等於0.能夠看到,在doAcquireShared(int arg)地自旋過程當中,若是當前節點地前驅爲頭節點時,嘗試獲取同步狀態,若是返回值大於等於0,表示該次獲取同步狀態成功並從自旋過程當中退出。

獨佔式超時獲取同步狀態

響應中斷的同步狀態獲取過程。在Java 5以前,當一個線程獲取不到鎖而被阻塞在synchronized以外時,對該線程進行中斷操做,此時該過程的中斷標誌位會被修改,但線程依舊會阻塞在synchronized上,等待着獲取鎖。在Java 5中,同步器提供了acquireInterruptibly(int arg)方法,這個方法在等待獲取同步狀態時,若是當前線程被中斷,會馬上返回,並拋出InterruptedException。

超市獲取同步狀態過程能夠被視做響應中斷獲取同步狀態過程的「加強版」,doAcquireNanos(int arg,long nanosTimeout)方法在支持響應中斷的基礎上,增長了超時獲取的特性。針對超市獲取,主要須要計算出須要睡眠的時間間隔nanosTimeout,爲了防止過早通知,nanosTimeout計算公式爲:nanosTimeout = now -lastTime,其中now爲當前喚醒時間,lastTime爲上次喚醒時間。

上圖中能夠看出,獨佔式超時獲取狀態doAcquireNanos(int arg,long nanosTimeout)和獨佔式獲取同步狀態acquire(int args)在流程上很是類似,其主要區別在於未獲取到同步狀態時的處理邏輯。

參考資料

相關文章
相關標籤/搜索