AQS-condition阻塞隊列 [自定義同步器框架]

一、AQS框架有同步隊列和阻塞隊列
    同步隊列前文已有敘述,阻塞隊列,本次將基於CyclicBarrier梳理講解,會比較易讀java

二、CyclicBarrier有四個關鍵操做lock.lock;trip.await;trip.signalAll;lock.unlock
    據此條線看一下condition阻塞隊列是如何發生做用的
    2-一、lock.lock條線   
    2-1-一、lock是直接使用的ReentrantLock,借其源碼一讀
        public void lock() {
            sync.lock();
        }
    2-1-二、sync是ReentrantLock的靜態內部類,屬裝飾模式下進行的包裝。
         具體區分NonfairSync和FairSync公平和非公平鎖。
        public ReentrantLock() {//從構造器來看,默認採用的是非公平鎖
            sync = new NonfairSync();
        }
    2-1-三、抓住lock線索,進入nonfairSync看一下其實現
    static final class NonfairSync extends Sync {
            final void lock() {
                if (compareAndSetState(0, 1))//第一次成功,後續走acquire排隊獲取
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    acquire(1);//會先行調用tryAcquire方法,其他邏輯同AQS自定義同步框架所述
            }node

            protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);//實如今父類Sync中
            }
        }
2-1-四、Sync的nonfairTryAcquire方法,查看如何獲取資源
    abstract static class Sync extends AbstractQueuedSynchronizer {
            abstract void lock();c#

            /**Performs non-fair tryLock.  tryAcquire isimplemented in subclasses*/
            final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();//持鎖線程
                int c = getState();
                if (c == 0) {//有資源
                    if (compareAndSetState(0, acquires)) {//更新state->排它鎖
                    setExclusiveOwnerThread(current);//設置本身爲持鎖線程
                    return true;
                    }
                } else if (current == getExclusiveOwnerThread()) {
                        //容許重入鎖,如CyclicBarrier的isBroken()方法
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;//非首次非重入,將排隊等待獲取資源
            }框架

        //lock.unlock是將會進入此節點,標記爲:ysma-unlock
            protected final boolean tryRelease(int releases) {
                int c = getState() - releases;//累減,兼容可重入鎖狀況
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
                if (c == 0) {//徹底退出,釋放資源
                    free = true;
                    setExclusiveOwnerThread(null);
                }
                setState(c);//可重入鎖資源釋放
                return free;
            }ui

            final ConditionObject newCondition() {//阻塞隊列需實現此方法
                return new ConditionObject();
            }
        }
小結:CyclicBarrier中lock.lock()的實現是AQS同步sync隊列爭搶資源
        那麼將會是第一個進入的線程Thread1搶到資源並持鎖進入,其他的線程ThreadX將會阻塞等待this


2-二、trip.await條線 
    2-2-一、如上lock方法僅Thread1持鎖進入,執行trip.await方法後阻塞,
        ThreadX等線程將會阻塞在lock處,排隊等待獲取資源
            由CyclicBarrier得知Condition trip = lock.newCondition();
            Condition由AbstractQueuedSynchronizer#ConditionObject實現,部分相關源碼以下
        public class ConditionObject implements Condition, java.io.Serializable {
   
            /** First node of condition queue. */
            private transient Node firstWaiter;
            /** Last node of condition queue. */
            private transient Node lastWaiter;spa

            /**Adds a new waiter to wait queue. 注意:阻塞隊列沒有前驅節點的設置*/
            private Node addConditionWaiter() {
                Node t = lastWaiter;
                // If lastWaiter is cancelled, clean out.
                if (t != null && t.waitStatus != Node.CONDITION) {
                    unlinkCancelledWaiters();
                    t = lastWaiter;
                }
                Node node = new Node(Thread.currentThread(), Node.CONDITION);
                if (t == null)
                    firstWaiter = node;//初始化-簡單粗暴
                else
                    t.nextWaiter = node;//掛到lastWaiter後面
                lastWaiter = node;
                return node;//waitStatus爲CONDITION狀態的節點
                }線程

            /**Removes and transfers all nodes*/
            private void doSignalAll(Node first) {
                lastWaiter = firstWaiter = null;
                do {
                    Node next = first.nextWaiter;
                    first.nextWaiter = null;
                    transferForSignal(first);
                //ysma-transferForSignal 標記此處:將節點從阻塞隊列移往sync隊列
                    first = next;
                } while (first != null);
            }orm

            /**變換condition阻塞爲sync阻塞*/
            public final void signalAll() {
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                Node first = firstWaiter;
                if (first != null)
                    doSignalAll(first);
            }blog

            /** Mode meaning to reinterrupt on exit from wait */
            private static final int REINTERRUPT =  1;
            /** Mode meaning to throw InterruptedException on exit from wait */
            private static final int THROW_IE    = -1;

            /**爲本次重點-這個要看的*/
            public final void await() throws InterruptedException {
                if (Thread.interrupted())
                    throw new InterruptedException();
                Node node = addConditionWaiter();//建立CONDITION塞節點
            //savedState結合acquireQueued方法可解讀爲將要獲取的資源數
                int savedState = fullyRelease(node);//標記ysma-fullyRelease
                int interruptMode = 0;
                while (!isOnSyncQueue(node)) {//不在sync隊列中則阻塞,標記ysma-isOnSyncQueue
                    LockSupport.park(this);//park 阻塞到這裏了
                /**標記ysma-checkInterruptWhileWaiting =>檢查等待過程當中是否發生了異常
                *  發生了異常要轉移node到sync隊列
                */

                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
                }//退出while循環時node已在sync隊列中了,詳見checkInterruptWhileWaiting和isOnSyncQueue
            
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                /**這裏sync隊列是忽略中斷的,因此上文被喚醒後執行checkInterruptWhileWaiting發現
                *線程是已經被中斷了的,by:acquireQueued返回線程是否中斷
                *可是中斷了也得給老子鎖上,
                *已在sync隊列就是THROW_IE,還須要排隊等待獲取資源就是REINTERRUPT
                *REINTERRUPT時是須要再次入sync隊列的,最後在加入sync隊列後
                *在reportInterruptAfterWait(interruptMode)再次拋出一個異常
                *來修改sync隊列中對應node的狀態
                *
comment1
                */
                if (node.nextWaiter != null) // clean up if cancelled
                    unlinkCancelledWaiters();
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);//標記ysma-reportInterruptAfterWait
            }
        }
    2-2-二、await方法執行LockSupport.park(this)阻塞前先行執行了fullyRelease    和isOnSyncQueue
        下面咱們進入fullyRelease一觀,看看阻塞前的操做流程
        final int fullyRelease(Node node) {
            boolean failed = true;
            try {
                int savedState = getState();//state=>資源數
                if (release(savedState)) {//釋放資源
                    failed = false;
                    return savedState;//返回savedState[釋放的資源數]
                } else {
                    throw new IllegalMonitorStateException();
                }
            } finally {
                if (failed)//呼應unlinkCancelledWaiters();
                    node.waitStatus = Node.CANCELLED;
            }
        }
        緊接着看一下release釋放什麼狀態
        public final boolean release(int arg) {
            if (tryRelease(arg)) {//釋放資源,參見Sync源碼標記處:ysma-unlock
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);//喚醒sync隊列頭節點的後繼節點
                return true;
            }
            return false;
        }
        先不急,讓咱們在看一眼isOnSyncQueue作了什麼在作個小結
        final boolean isOnSyncQueue(Node node) {
            if (node.waitStatus == Node.CONDITION || node.prev == null)
                return false;//CONDITION狀態的節點確定不在sync隊列中
            if (node.next != null) // If has successor, it must be on queue
                return true;
            return findNodeFromTail(node);//見下
        }
        private boolean findNodeFromTail(Node node) {
            Node t = tail;
            for (;;) {//sync隊列中自尾到頭找詢node
                if (t == node)
                    return true;
                if (t == null)
                    return false;
                t = t.prev;
            }
        }
        小結:A:由上可見Thread1持鎖進入,ThreadX阻塞等待;
                B:而後加入Thread1到阻塞隊列,釋放資源給ThreadX,使得ThreadX能夠得到資源,移除sync隊列;
                C:Thread1直到再次從阻塞隊列進入sync同步隊列前持續阻塞。
                C:ABC循環直到將全部線程從sync隊裏轉移到阻塞隊列。CyclicBarrier到達臨界點


2-三、trip.signalAll條線
        當全部線程都從sync隊列轉移到condition阻塞隊列後,不能一直阻塞,這裏以signalAll爲例進行梳理
        詳見2-2-一、ConditionObject#signalAll,進而由doSignalAll從firstWaiter到lastWaiter
            依次把CONDITION節點從condition阻塞隊列移除並掛到sync隊列中
        這裏咱們關注下標記:ysma-transferForSignal 看一下node是如何轉掛到sync隊列上的
        AbstractQueuedSynchronizer#transferForSignal
        final boolean transferForSignal(Node node) {
            /*If cannot change waitStatus, the node has been cancelled.*/
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
                return false;

            /*入sync隊列,自旋到成功爲止詳見前文AQS自定義同步框架*/
            Node p = enq(node);
            int ws = p.waitStatus;
            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
                //入隊列後,線程取消或者更新狀態失敗,喚醒對應的線程,進行處理
                LockSupport.unpark(node.thread);
            return true;
        }
        小結:signal就是隻處理firstWaiter,signalAll就是從firstWaiter到lastWaiter挨個處理一遍
            基本步驟就是從condition隊列中把節點移除,而後把節點運輸到sync隊列上


2-四、lock.unlock條線
        sync同步隊列自行運轉,會依次釋放信號量喚醒線程,那麼就剩下最後的事情了,釋放鎖/資源
        public void unlock() {
            sync.release(1);//AbstractQueuedSynchronizer#release 以下
        }
        public final boolean release(int arg) {
            if (tryRelease(arg)) {//詳見上文Sync#tryRelease
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
        總結:從CyclicBarrier能夠看出,整個condition的運用可分爲三部分
            一、利用sync隊列控制資源
            二、將線程節點從sync隊列轉移到condition隊列,釋放資源可是仍阻塞持鎖人/線程
            三、直到條件合適,將線程節點移回sync隊列,正常競爭資源直到退出
            四、in sync=>out sync=>in condition=>out condition=>in sync=>out sync


        PS:
        標記備註1:ysma-checkInterruptWhileWaiting
        private int checkInterruptWhileWaiting(Node node) {
            return Thread.interrupted() ?
                /**線程在等待期間中斷了,transferAfterCancelledWait
                *成功就是THROW_IE不然就是REINTERRUPT 做用見:
comment1*/
                (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                0;
        }
        final boolean transferAfterCancelledWait(Node node) {
            if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
                /**修改waitStatus變動condition節點爲sync節點*/
                enq(node);//入sync隊列
                return true;
            }
            /*If we lost out to a signal(), then we can't proceed
             * until it finishes its enq().  Cancelling during an
             * incomplete transfer is both rare and transient, so just spin.
             * 若是CAS更新失敗,那麼必定有別個已經在操做了,此時只須要耐心自旋,等待node掛到sync隊列上便可
             */

            while (!isOnSyncQueue(node))
                Thread.yield();
            return false;
        }
        標記備註2:ysma-reportInterruptAfterWait
        private void reportInterruptAfterWait(int interruptMode)
            throws InterruptedException {
            if (interruptMode == THROW_IE)
                throw new InterruptedException();
            else if (interruptMode == REINTERRUPT)
                selfInterrupt();
            /**condition隊列裏面的中斷致使其提早進入sync隊列,             *須要再次發送中斷信號,以觸發sync同步流程的中斷處理機制             */         }

相關文章
相關標籤/搜索