淺析AQS

#前言java

很久沒寫blog了,最近學車,時間真的少了不少,花了一些時間寫了一篇AQS,請你們指正。node

概述

翻閱AbstractQueuedSynchronizer的源碼,會發現以下注釋:併發

Pprovides a framework for implementing blocking locks and related  
synchronizers (semaphores,  events, etc) that rely on   
first-in-first-out (FIFO) wait queues.

AbstractQueuedSynchronizer提供一個基於FIFO隊列的框架,該框架用於實現阻塞鎖和相關同步器(例如:semaphores)。框架

如此可知,AbstractQueuedSynchronizer能夠視爲JDK同步器的框架,理解它,有助於理解JDK的同步器。ide

框架說明

本人依據JDK源碼中的註釋結合併發經驗,總結了以下AQS框架說明:工具

  • AQS是依賴狀態進行同步操做的,其內部使用一個整形變量state,來表示同步狀態,此狀態值依據具體的同步器語義實現。例如:在CountDownLatch中state即爲須要等待的線程數。ui

  • AQS的子類必須定義在獲取和釋放上對應的狀態值。對於AQS狀態變量的操做必須使用getState,setState,compareAndSetState 三個原子方法。線程

  • AQS 提供了互斥與共享兩種模式,AbstractQueuedSynchronizer類中的final方法已完善隊列和阻塞機制,子類僅須要實現protected方法,設計

    • protected方法說明:
      tryAcquire 嘗試獲取互斥鎖或許可
      tryRelease 嘗試釋放互斥鎖或許可
      tryAcquireShared 嘗試獲取共享鎖或許可
      tryReleaseShared 嘗試釋放共享鎖或許可
      isHeldExclusively 是否持有互斥鎖或許可
  • AQS的子類應該被定義爲非公共的內部助手類,用於實現它們的封閉類的同步屬性code

  • AQS在序列化時僅序列化狀態,在默認狀況下會獲得一個空的線程隊列。子類一般須要實現readObject方法,用來設置初始狀態。

  • hasQueuedPredecessors在設計公平的同步器時使用,若是該方法返回true,公平的同步器tryAcquire方法應該返回false

  • ConditionObject AQS的內部類,子類能夠用ConditionObject實現條件謂詞,若不須要實現條件謂詞能夠不實現。

核心操做

獨佔式

獲取

//JDK中的源碼
 public final void acquire(int state) {
        if (!tryAcquire(state) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), state))
            selfInterrupt();
    }

其對應代碼的語義爲:

while (!獲取不成功) {
     若是當前線程不在隊列中, 加入隊列
     阻塞當前線程
 }
 即阻塞直到獲取成功。

釋放

//JDK中的源碼
public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

對應代碼的語義爲:

if (嘗試釋放成功)
      解鎖隊列中的第一個線程

共享式

獲取

若是當前節點爲隊列中的第一個節點,嘗試獲取,獲取成功進行head後續節點的設置。如獲取失敗維護先後節點關係,若須要阻塞進行阻塞,以後繼續重試。 若出現異常獲取失敗,取消當前節點獲取操做。

public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)//嘗試獲取失敗
            doAcquireShared(arg);//進行共享式獲取
    }
    
    /**
     * Acquires in shared uninterruptible mode.
     * @param arg the acquire argument
     */
    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {//當前節點的先驅節點爲head,即當前節點爲第一個
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {//嘗試獲取成功
                        //向上冒泡,保證head節點的後驅節點爲未獲取到的節點
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                //包裝爲 獲取失敗的節點 若須要中斷進行中斷
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

釋放

確保級聯釋放,即便有其餘的線程正在進行的獲取/釋放。 這個過程一般嘗試釋放head的後續節點,若是他須要被釋放。 若是該節點不須要,會向下傳遞釋放動做,直到釋放成功。 此外,咱們必須在添加新節點時進行循環處理。不一樣於其餘操做 中釋放後續節點,咱們須要知道CAS是否重置了狀態,因此咱們須要重複檢查。

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
    
     private void doReleaseShared() {
        /*
     

         */
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // 循環檢查狀態
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // 循環檢查CAS
            }
            if (h == head)                   // 循環檢查是否有新節點
                break;
        }
    }

JDK註釋中的例子

不可重入鎖(互斥模式)

在不可重入鎖Mutex中 ,咱們使用state=0表示釋放,state=1表示獲取

class Mutex implements Lock, java.io.Serializable {
 
    // 內部助手同步類Sync
    private static class Sync extends AbstractQueuedSynchronizer {
      // 當state=1表示獲取了獨佔鎖
      protected boolean isHeldExclusively() {
        return getState() == 1;
      }
 
      // 若是state=0,鎖是釋放狀態,嘗試獲取
      public boolean tryAcquire(int acquires) {
       assert acquires == 1; // acquires爲1表示進行獲取操做,其餘值無效
        if (compareAndSetState(0, 1)) {//CAS操做
          setExclusiveOwnerThread(Thread.currentThread());//設置鎖的持有者爲當前線程
         return true;
        }
        return false;
      }
 
      //嘗試釋放
     protected boolean tryRelease(int releases) {
        assert releases == 1; // 傳入的值爲1表示進行釋放,其餘值無效
        if (getState() == 0) throw new IllegalMonitorStateException();
       setExclusiveOwnerThread(null);
       setState(0);//設置狀態爲0,表示鎖已釋放
      return true;
      }
 
     // 提供一個條件謂詞
     Condition newCondition() { return new ConditionObject(); }
 
     // 反序列化屬性
     private void readObject(ObjectInputStream s)
         throws IOException, ClassNotFoundException {
        s.defaultReadObject();
       setState(0); //設置初始狀態爲釋放
      }
   }
 
    // 全部同步操做 委託給Sync,下面咱們實現必要的鎖須要的操做
    private final Sync sync = new Sync();
 
    public void lock()                { sync.acquire(1); }
    public boolean tryLock()          { return sync.tryAcquire(1); }
    public void unlock()              { sync.release(1); }
   public Condition newCondition()   { return sync.newCondition(); }
    public boolean isLocked()         { return sync.isHeldExclusively(); }
    public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
    public void lockInterruptibly() throws InterruptedException {
      sync.acquireInterruptibly(1);
    }
    public boolean tryLock(long timeout, TimeUnit unit)
       throws InterruptedException {
      return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
  }

布爾閉鎖(共享模式)

state=0表示未被通知(等待中,不可共享獲取),state!=0表示被通知(可共享獲取)

class BooleanLatch {
    //內部同步器,state=0表示未被通知(等待中,不可共享獲取),state!=0表示被通知(可共享獲取)
    private static class Sync extends AbstractQueuedSynchronizer {
      boolean isSignalled() { return getState() != 0; }
    /**
    *tryAcquireShared 返回負值 獲取失敗
    *0 獲取成功其餘線程不能獲取
    *正值獲取成功,其餘線程也可獲取成功
    /

      protected int tryAcquireShared(int ignore) {
        return isSignalled() ? 1 : -1;
      }
 
      protected boolean tryReleaseShared(int ignore) {
        setState(1);
        return true;
      }
    }
 
   private final Sync sync = new Sync();
   public boolean isSignalled() { return sync.isSignalled(); }
   public void signal()         { sync.releaseShared(1); }
   public void await() throws InterruptedException {
      sync.acquireSharedInterruptibly(1);
    }
  }

部分JDK同步器類簡要分析

分析JDK中的同步類,除了瞭解AQS外,還要知道每一個同步器中的state的語義是什麼,AQS上邊已經分析了,下面介紹下幾個同步器的state的語義。

ReentrantLock

ReentrantLock 只支持獨佔方式的獲取操做,它實現了tryAcquire,tryRelease和isHeldExclusively.
ReentrantLock的狀態用於存儲鎖獲取的操做次數,同一線程每獲取一次加1,每釋放一次減小1.
tryAcquire代碼簡要分析

  1. 當前狀態值(即鎖獲取的操做)>0,鎖的全部者非當前線程,獲取失敗
  2. 若是狀態值飽和,獲取失敗,即超過最大可獲取線程數
  3. 若是當前線程符合獲取鎖的條件,更新狀態值,若是須要設置鎖的持有者爲當前線程
protected final boolean tryAcquire(int acquires) {
            Thread current = Thread.currentThread();
            int c = getState();
            int w = exclusiveCount(c);
            if (c != 0) {
                //  當前狀態值(即鎖獲取的操做)>0,鎖的全部者非當前線程,獲取失敗
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w + exclusiveCount(acquires) > MAX_COUNT)//若是狀態值飽和,獲取失敗,即超過最大可獲取線程數
                    throw new Error("Maximum lock count exceeded");
                //符合獲取鎖的條件,更新狀態值,
                setState(c + acquires);
                return true;
            }
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
			//設置鎖的持有者爲當前線程
            setExclusiveOwnerThread(current);
            return true;
        }

Semaphore與CountDownLatch

CountDownLatch同步狀態保存當前的計數值。相似BooleanLatch,不作分析。
Semaphore的同步狀態用於存儲當前能夠許可的數量。
Semaphore中的tryAcquireShared,tryReleaseShared
tryAcquireShared,獲取當前可用許可數量,若可用許可數量大於申請數量,經過compareAndSetState設置新的剩餘許可數量,不然獲取失敗。
tryReleaseShared獲取當前可用許可數量,若是當前剩餘許可數量+釋放數量>0,過compareAndSetState設置新的剩餘許可數量,不然獲取失敗。

/**
*tryAcquireShared,獲取當前可用許可數量,若可用許可數量大於申請數量,經過compareAndSetState設置新的剩餘許可數量,不然獲取失敗。
*/
        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
/**
*
*tryReleaseShared獲取當前可用許可數量,若是當前剩餘許可數量+釋放數量>0,過compareAndSetState設置新的剩餘許可數量,不然獲取失敗。
*/
       protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

FutueTask

FutueTask的同步器狀態值以下:

NEW  = 0;  //初始狀態
COMPLETING   = 1;  //運行中
NORMAL       = 2;  //完成
EXCEPTIONAL  = 3;  //異常
CANCELLED    = 4;  //已取消
INTERRUPTING = 5;  //中斷中
INTERRUPTED  = 6;  //已中斷

可能的狀態轉換

NEW(初始狀態) -> COMPLETING(運行中) -> NORMAL (已完成)
NEW(初始狀態) -> COMPLETING(運行中) -> EXCEPTIONAL (異常)
NEW (初始狀態)-> CANCELLED (已取消)
NEW (初始狀態)-> INTERRUPTING (中斷中)-> INTERRUPTED (已中斷)

Future.get的語義很是相似閉鎖,若是發生了某件事件(由FutureTask表示的任務執行完成 或者取消),那麼線程能夠恢復執行,不然一致阻塞。

總結

AQS是JDK併發的框架,仔細理解有助於理解JDK的同步工具。 對於JDK的部分同步類,進行了簡要說明,詳細自行查閱源碼。 對於JDK同步類的源碼建議進行以下步驟: 1.理解同步器的狀態值的語義 2.該同步器使用是AQS的什麼模式, 是共享,互斥,仍是共享與互斥都有。 3.優先理解同步器的tryAcquire,tryAcquireShared,tryRelease,tryReleaseShared方法,以後查看其它方法。

相關文章
相關標籤/搜索