原理剖析(第 007 篇)CountDownLatch工做原理分析

原理剖析(第 007 篇)CountDownLatch工做原理分析

1、大體介紹

一、在前面章節瞭解了CAS、AQS後,想必你們已經對這塊知識有了深入的瞭解了;
二、而JDK中有一個關於計數同步器的工具類,它也是基於AQS實現的;
三、那麼本章節就和你們分享分析一下JDK1.8的CountDownLatch的工做原理; 

2、簡單認識CountDownLatch

2.1 何爲CountDownLatch?

一、CountDownLatch從英文字面上理解,count計數作down的減法動做,而Latch又是門閂的意思;

二、CountDownLatch是一種同步幫助,容許一個或多個線程等待,直到在其餘線程中執行的一組操做完成。;

三、CountDownLatch內部沒有所謂的公平鎖\非公平鎖的靜態內部類,只有一個Sync靜態內部類,CountDownLatch內部基本上也是經過sync.xxx之類的這種調用方式的;

四、CountDownLatch內部維護了一個虛擬的資源池,若是許可數不爲爲0一直線程阻塞等待,直到許可數爲0時才釋放繼續往下執行;

2.2 CountDownLatch的state關鍵詞

一、其實CountDownLatch的實現也偏偏很好利用了其父類AQS的state變量值;

二、初始化一個數量值做爲計數器的默認值,假設爲N,那麼當任何線程調用一次countDown則計數值減1,直到許可爲0時才釋放等待;

三、CountDownLatch,簡單大體意思爲:A組線程等待另外B組線程,B組線程執行完了,A組線程才能夠執行;

2.3 經常使用重要的方法

一、public CountDownLatch(int count)
   // 建立一個給定許計數值的計數同步器對象

二、public void await()
   // 入隊等待,直到計數器值爲0則釋放等待

三、public void countDown()
   // 釋放許可,計數器值減1,若計數器值爲0則觸發釋放無用結點
   
四、public long getCount() 
   // 獲取目前最新的共享資源計數器值

2.4 設計與實現僞代碼

一、獲取共享鎖:
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

	await{
		若是檢測中斷狀態發現被中斷過的話,那麼則拋出InterruptedException異常
		若是嘗試獲取共享鎖失敗的話( 嘗試獲取共享鎖的各類方式由AQS的子類實現 ),
		那麼就新增共享鎖結點經過自旋操做加入到隊列中,而後經過調用LockSupport.park進入阻塞等待,直到計數器值爲零才釋放等待
	}
	
	
二、釋放共享鎖:
    public void countDown() {
        sync.releaseShared(1);
    }
	
	release{
		若是嘗試釋放共享鎖失敗的話( 嘗試釋放共享鎖的各類方式由AQS的子類實現 ),
		那麼經過自旋操做完成阻塞線程的喚起操做
	}

2.五、CountDownLatch生活細節化理解

好比百米賽跑,我就以賽跑爲例生活化闡述該CountDownLatch原理:

一、場景:百米賽跑十人蔘賽,終點處有一個裁判計數;

二、開跑一聲槍響,十我的爭先恐後的向終點跑去,真的是振奮多秒,使人振奮;

三、當一我的到達終點,這我的就完成了他的賽跑事情了,就沒事一邊玩去了,那麼裁判則減去一我的;

四、隨着人員陸陸續續的都跑到了終點,最後裁判計數顯示還有0我的未到達,意思就是人員都達到了;

五、而後裁判就拿着登記的成績屁顛屁顛去輸入電腦登記了;

八、到此打止,這一系列的動做認爲是A組線程等待另外其餘組線程的操做,直到計數器爲零,那麼A則再幹其餘事情;

3、源碼分析CountDownLatch

3.一、CountDownLatch構造器

一、構造器源碼:
    /**
     * Constructs a {@code CountDownLatch} initialized with the given count.
     *
     * @param count the number of times {@link #countDown} must be invoked
     *        before threads can pass through {@link #await}
     * @throws IllegalArgumentException if {@code count} is negative
     */
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
	
二、建立一個給定許計數值的計數同步器對象,計數器值必須大於零,count值最後賦值給了state這個共享資源值;

3.二、Sync同步器

一、AQS --> Sync
				  
二、CountDownLatch內的同步器都是經過Sync抽象接口來操做調用關係的,細看會發現基本上都是經過sync.xxx之類的這種調用方式的;

3.三、await()

一、源碼:
    /**
     * Causes the current thread to wait until the latch has counted down to
     * zero, unless the thread is {@linkplain Thread#interrupt interrupted}.
     *
     * // 致使當前線程等待,直到計數器值減爲零則釋放等待,或者因爲線程被中斷也可致使釋放等待;
     *
     * <p>If the current count is zero then this method returns immediately.
     *
     * <p>If the current count is greater than zero then the current
     * thread becomes disabled for thread scheduling purposes and lies
     * dormant until one of two things happen:
     * <ul>
     * <li>The count reaches zero due to invocations of the
     * {@link #countDown} method; or
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * the current thread.
     * </ul>
     *
     * <p>If the current thread:
     * <ul>
     * <li>has its interrupted status set on entry to this method; or
     * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
     * </ul>
     * then {@link InterruptedException} is thrown and the current thread's
     * interrupted status is cleared.
     *
     * @throws InterruptedException if the current thread is interrupted
     *         while waiting
     */
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
	
二、await此方法被調用後,則一直會處於等待狀態,其核心仍是因爲調用了LockSupport.park進入阻塞等待;
   當計數器值state=0時能夠打破等待現狀,固然還有線程被中斷後也能夠打破線程等待現狀;

3.四、acquireSharedInterruptibly(int)

一、源碼:
    /**
     * Acquires in shared mode, aborting if interrupted.  Implemented
     * by first checking interrupt status, then invoking at least once
     * {@link #tryAcquireShared}, returning on success.  Otherwise the
     * thread is queued, possibly repeatedly blocking and unblocking,
     * invoking {@link #tryAcquireShared} until success or the thread
     * is interrupted.
     * @param arg the acquire argument.
     * This value is conveyed to {@link #tryAcquireShared} but is
     * otherwise uninterpreted and can represent anything
     * you like.
     * @throws InterruptedException if the current thread is interrupted
     */
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted()) // 調用以前先檢測該線程中斷標誌位,檢測該線程在以前是否被中斷過
            throw new InterruptedException(); // 若被中斷過的話,則拋出中斷異常
        if (tryAcquireShared(arg) < 0) // 嘗試獲取共享資源鎖,小於0則獲取失敗,此方法由AQS的具體子類實現
            doAcquireSharedInterruptibly(arg); // 將嘗試獲取鎖資源的線程進行入隊操做
    }
	
二、因爲是實現同步計數器功能,因此tryAcquireShared首次調用一定小於0,則就順利了進入了doAcquireSharedInterruptibly線程等待;
   至於首次調用爲何會小於0,請看子類的實現,子類的實現判斷爲 "(getState() == 0) ? 1 : -1" ;

3.五、tryAcquireShared(int)

一、源碼:
	protected int tryAcquireShared(int acquires) {
		return (getState() == 0) ? 1 : -1; // 計數器值與零比較判斷,小於零則獲取鎖失敗,大於零則獲取鎖成功
	}
	
二、嘗試獲取共享鎖資源,可是在計數器CountDownLatch這個功能中,小於零則須要入隊,進入阻塞隊列進行等待;大於零則喚醒等待隊列,釋放await方法的阻塞等待;

3.六、doAcquireSharedInterruptibly(int)

一、源碼:
    /**
     * Acquires in shared interruptible mode.
     * @param arg the acquire argument
     */
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
		// 按照給定的mode模式建立新的結點,模式有兩種:Node.EXCLUSIVE獨佔模式、Node.SHARED共享模式;
        final Node node = addWaiter(Node.SHARED); // 建立共享模式的結點
        boolean failed = true;
        try {
            for (;;) { // 自旋的死循環操做方式
                final Node p = node.predecessor(); // 獲取結點的前驅結點
                if (p == head) { // 若前驅結點爲head的話,那麼說明當前結點天然不用說了,僅次於老大以後的即是老二了咯
                    int r = tryAcquireShared(arg); // 並且老二也但願嘗試去獲取一下鎖,萬一頭結點恰巧剛剛釋放呢?但願仍是要有的,萬一實現了呢。。。
                    if (r >= 0) { // 若r>=0,說明已經成功的獲取到了共享鎖資源
                        setHeadAndPropagate(node, r); // 把當前node結點設置爲頭結點,而且調用doReleaseShared釋放一下無用的結點
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
					
					// 可是在await方法首次被調用會流轉到此,這個時候獲取鎖資源會失敗,即r<0,因此會進入是否須要休眠的判斷
					// 可是第一次進入休眠方法,由於被建立的結點waitStatus=0,因此會被修改一次爲SIGNAL狀態,再次循環一次
					// 而第二次循環進入shouldParkAfterFailedAcquire方法時,返回true就是須要休眠,則順利調用park方式阻塞等待
                }
                if (shouldParkAfterFailedAcquire(p, node) && // 根據前驅結點看看是否須要休息一下子
                    parkAndCheckInterrupt()) // 阻塞操做,正常狀況下,獲取不到共享鎖,代碼就在該方法中止了,直到被喚醒
					// 被喚醒後,發現parkAndCheckInterrupt()裏面檢測了被中斷了的話,則補上中斷異常,所以拋了個異常
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
	
二、doAcquireSharedInterruptibly在實現計數器原理的時候,主要的乾的事情就是等待再等待,等到計數器值爲零時才甦醒;

3.七、countDown()

一、源碼:
    /**
     * Decrements the count of the latch, releasing all waiting threads if
     * the count reaches zero.
     *
     * <p>If the current count is greater than zero then it is decremented.
     * If the new count is zero then all waiting threads are re-enabled for
     * thread scheduling purposes.
     *
     * <p>If the current count equals zero then nothing happens.
     */
    public void countDown() {
        sync.releaseShared(1); // 釋放一個許可資源 
    }
	
二、釋放許可資源,也就是計數器值不斷的作減1操做,當計數器值爲零的時候,該方法將會釋放全部正在等待的線程隊列;
   至於爲何還會釋放全部,請看後續的releaseShared(int arg)講解;

3.八、releaseShared(int)

一、源碼:
    /**
     * Releases in shared mode.  Implemented by unblocking one or more
     * threads if {@link #tryReleaseShared} returns true.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryReleaseShared} but is otherwise uninterpreted
     *        and can represent anything you like.
     * @return the value returned from {@link #tryReleaseShared}
     */
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) { // 嘗試釋放共享鎖資源,此方法由AQS的具體子類實現
            doReleaseShared(); // 自旋操做,喚醒後繼結點
            return true; // 返回true代表全部線程已釋放
        }
        return false; // 返回false代表目前還沒釋放徹底,只要計數器值不爲零的話,那麼都會返回false
    }
	
二、releaseShared方法首先就判斷了tryReleaseShared(arg)的返回值,可是計數器值只要不爲零,都會返回false,所以releaseShared該方法就立馬返回false了;

三、因此當計數器值慢慢減至零時,則立馬返回true,那麼也就立馬會調用doReleaseShared釋放全部等待的線程隊列;

3.九、tryReleaseShared(int)

一、源碼:
	// CountDownLatch 的靜態內部類 Sync 類的 tryReleaseShared 方法	
	protected boolean tryReleaseShared(int releases) {
		// Decrement count; signal when transition to zero
		for (;;) { // 自旋的死循環操做方式
			int c = getState(); // 獲取最新的計數器值
			if (c == 0) // 若計數器值爲零,說明已經經過CAS操做減至零了,因此在併發中讀取到零時並不須要作什麼操做,所以返回false
				return false;
			int nextc = c-1; // 計數器值減1操做
			if (compareAndSetState(c, nextc)) // 經過CAS比較,順利狀況下設置成功返回true
				return nextc == 0; // 當經過計算操做獲得的nextc爲零時經過CAS修改爲功,那麼代表全部事情都已經作完,須要釋放全部等待的線程隊列
				
			// 若CAS失敗,想都不用想確定是因爲併發操做,致使CAS失敗,那麼惟一可作的就是下一次循環查看是否已經被其餘線程處理了
		}
	}
	
二、CountDownLatch的靜態內部類實現父類AQS的方法,用來處理如何釋放鎖,籠統的講,若返回負數則須要進入阻塞隊列,不然須要釋放全部等待隊列;

3.十、doReleaseShared()

一、源碼:
    /**
     * Release action for shared mode -- signals successor and ensures
     * propagation. (Note: For exclusive mode, release just amounts
     * to calling unparkSuccessor of head if it needs signal.)
     */
    private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) { // 自旋的死循環操做方式
            Node h = head; // 每次都是取出隊列的頭結點
            if (h != null && h != tail) { // 若頭結點不爲空且也不是隊尾結點
                int ws = h.waitStatus; // 那麼則獲取頭結點的waitStatus狀態值
                if (ws == Node.SIGNAL) { // 若頭結點是SIGNAL狀態則意味着頭結點的後繼結點須要被喚醒了
					// 經過CAS嘗試設置頭結點的狀態爲空狀態,失敗的話,則繼續循環,由於併發有可能其它地方也在進行釋放操做
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h); // 喚醒頭結點的後繼結點
                }
				// 如頭結點爲空狀態,則把其改成PROPAGATE狀態,失敗的則多是由於併發而被改動過,則再次循環處理
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
			// 若頭結點沒有發生什麼變化,則說明上述設置已經完成,大功告成,功成身退
			// 若發生了變化,多是操做過程當中頭結點有了新增或者啥的,那麼則必須進行重試,以保證喚醒動做能夠延續傳遞
            if (h == head)                   // loop if head changed
                break;
        }
    }
	
二、主要目的是釋放線程中全部等待的隊列,當計數器值爲零時,此方法立刻會被調用,經過自旋方式輪詢幹掉全部等待的隊列;

4、總結

一、有了分析AQS的基礎後,再來分析CountDownLatch便快了不少;

二、在這裏我簡要總結一下CountDownLatch的流程的一些特性:
	• 管理一個大於零的計數器值;
	• 每countDown一次則state就減1一次,直到許可證數量等於0則釋放隊列中全部的等待線程;
	• 也能夠經過countDown/await組合一塊兒使用,來實現CyclicBarrier的功能;

5、下載地址

https://gitee.com/ylimhhmily/SpringCloudTutorial.gitnode

SpringCloudTutorial交流QQ羣: 235322432git

SpringCloudTutorial交流微信羣: 微信溝通羣二維碼圖片連接微信

歡迎關注,您的確定是對我最大的支持!!!併發

相關文章
相關標籤/搜索