Java併發編程之CountDownLatch源碼分析

CountDownLatch介紹

CountDownLatch是JDK1.5提供用來多線程併發同步的工具,可讓一個或多個線程等待另外一個線程執行完再執行。java

例子

private static CountDownLatch countDownLatch = new CountDownLatch(1);
	
	static class ThreadRunnable1 implements Runnable{
		
		@Override
		public void run() {
			countDownLatch.countDown();		
		}
		
	}
	public static void main(String[] args) throws InterruptedException {
		Thread thread1 = new Thread(new ThreadRunnable1());
		thread1.setName("線程1");
		thread1.start();
		countDownLatch.await();
	}

如上例子,先建立了計數器爲1的CountDownLatch,運行線程1調用countDown()方法,主線程調用await()方法並阻塞。調用countDown()會將計數器減1當計數器到達0,會喚醒await()的阻塞並繼續執行下面的邏輯。node

CountDownLatch原理分析

CountDownLatch定義了一個成員變量多線程

private final Sync sync;

Sync繼承至AbstractQueuedSynchronizer是CountDownLatch的內部類,提供了阻塞隊列的支持。CountDownLatch的UML關係圖以下併發

CountDownLatch主要利用AbstractQueuedSynchronizer(下文稱AQS)中的隊列和隊列的通知傳播的技術實現。建立CountDownLatch時先初始化設置的計數器,調用countDown()能夠減計數器,當計數器到0時會通知去喚醒調用了await()的線程,而調用了await()線程會被加入到AQS同步阻塞隊列,因此先喚醒AQS的同步阻塞隊列的頭節點線程,頭節點線程被喚醒後會一樣喚醒AQS同步阻塞隊列的下個節點,以此傳播喚醒全部調用了await()的線程。ide

下面來分析下CountDownLatch的2個主要方法countDown()和await()工具

countDown()源碼分析

countDown()源碼以下oop

public void countDown() {
        sync.releaseShared(1);
 }

countDown()是直接調用了AQS裏的releaseShared(1)方法咱們來看下releaseShared(1)方法源碼分析

releaseShared(1)源碼以下ui

public final boolean releaseShared(int arg) {
 		//計數器減一併返回剩餘計數器是否等於0
        if (tryReleaseShared(arg)) {
		//喚醒同步阻塞隊列中的頭節點的下個節點線程
            doReleaseShared();
            return true;
        }
        return false;
    }

先調用tryReleaseShared(arg)將計數器減一併返回剩餘計數器是否等於0,若是是調用doReleaseShared()喚醒同步阻塞隊列中的頭節點的下個節點線程。.net

tryReleaseShared(arg)方法是CountDownLatch內部類Sync的實現。咱們來看下tryReleaseShared(arg)的源碼。

protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
		//獲取當前計時器
                int c = getState();
		//若是已經爲0則返回false,說明被以前線程減成0了
                if (c == 0)
                    return false;
		//計時器減一
                int nextc = c-1;
		//CSA更新值,若是更新失敗自旋更新直至成功
                if (compareAndSetState(c, nextc))
		//更新成功後剩餘的計時器是否爲0
                    return nextc == 0;
            }
        }

tryReleaseShared(arg)實現很簡單,獲取當前計時器減一併使用CAS方式更新,若是更新失敗自旋更新直至成功,最後返回剩餘的計時器是否爲0。

doReleaseShared()這個方法比較重要await()中也會調用,這裏先不作分析,下面會在await()方法中一塊兒分析。

await()源碼分析

await()源碼以下

public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

await()是直接調用了AQS裏的acquireSharedInterruptibly(1)方法咱們來看下acquireSharedInterruptibly(1)方法

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
	//判斷當前線程是否被中斷,中斷的話拋出異常
        if (Thread.interrupted())
            throw new InterruptedException();
	//判斷當前計數器是否爲0是的話返回1不然返回-1
        if (tryAcquireShared(arg) < 0)
	//加入到同步阻塞隊列等待被喚醒
            doAcquireSharedInterruptibly(arg);
    }

先判斷當前線程是否被中斷,中斷的話拋出異常。而後調用tryAcquireShared(arg)判斷當前計數器是否爲0是的話返回1不然返回-1,這裏若是計數器爲0了就表示計數器已經被countDown()減完了無需進行阻塞。若是計數器未被減完則調用 doAcquireSharedInterruptibly(arg)加入到同步阻塞隊列等待被喚醒。

tryAcquireShared(arg)方法也是CountDownLatch內部類Sync的實現,下面看下tryAcquireShared(arg)的源碼,源碼以下

protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

實現很簡單取計數器判斷是否爲0是的話返回1不然返回-1。

下面doAcquireSharedInterruptibly(arg)來看下的源碼

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
	//以當前線程爲基礎增長共享節點到同步阻塞隊列
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
		//獲取前一個節點
                final Node p = node.predecessor();
                if (p == head) {
		//判斷當前計數器是否爲0是的話返回1不然返回-1
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
			//設置頭節點並傳播喚醒同步阻塞隊列中的下個節點
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
			//阻塞線程
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
			刪除失敗節點
                cancelAcquire(node);
        }
    }

doAcquireSharedInterruptibly裏有個循環,這個循環的主要做用就是在線程喚醒後重試獲取鎖直到獲取鎖。node.predecessor()獲取當前線程節點的前一個節點,若是是頭節點,則當前線程嘗試獲取鎖,獲取鎖成功調用setHeadAndPropagate(node, r)設置頭節點並傳播通知。若是獲取失敗或者非頭節點則調用shouldParkAfterFailedAcquire(p, node)判斷是否須要阻塞等待,若是須要阻塞等待則調用parkAndCheckInterrupt()阻塞當前線程並讓出cup資源資質被前一個節點喚醒,若是線程被中斷則拋出InterruptedException()異常。

doAcquireSharedInterruptibly中的addWaiter(Node.SHARED),shouldParkAfterFailedAcquire(p, node)和parkAndCheckInterrupt()方法以前的文章已經描述過(連接)這裏就再也不贅述。

下面來看下setHeadAndPropagate(node, r)的實現

private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head;
	//更新當前節點爲頭節點
        setHead(node);
	//這裏propagate必大於0
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
	//若是下個節點爲空或者是共享模式,就要傳播喚醒下個節點
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

主要邏輯就是設置當前節點爲頭節點,若是當前節點的下個節點不爲空就要傳播喚醒下個節點。

下面主要看下doReleaseShared()方法

private void doReleaseShared() {
        for (;;) {
		//1.獲取頭節點
            Node h = head;
		//2.頭節點不會空,並且頭節點有下個節點
            if (h != null && h != tail) {
                int ws = h.waitStatus;
		//3.若是頭節點狀態是SIGNAL,則喚醒
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
		//4.若是節點爲初始狀態則設置節點爲PROPAGATE狀態
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
		//5.若是頭節點改變,繼續循環
            if (h == head)                   // loop if head changed
                break;
        }
    }

整個循環就是利用樂觀鎖的方式改變節點狀態並喚醒下個節點線程。5個步驟以下

  • (1)獲取頭節點

  • (2)頭節點不會空,並且頭節點有下個節點

  • (3)步驟就是判斷頭節點狀態是否是SIGNAL狀態,這裏若是頭節點狀態是SIGNAL說明此節點已經被它的下個節點設置爲SIGNAL狀態,須要喚醒此節點的下個節點, 這裏調用unparkSuccessor(h)喚醒,unparkSuccessor(h)源碼方法以前的文章已經描述過(連接)這裏就再也不贅述。

  • (4)若是頭節點爲初始狀態0這設置狀態爲PROPAGATE。這裏不會喚醒他的下個節點,那麼下個節點會不會一直阻塞。其實下個節點線程不會一直阻塞,若是頭節點爲初始狀態0說明它的下個節點還沒調用shouldParkAfterFailedAcquire(p, node)方法改變頭節點的狀態爲SIGNAL。若是在步驟4前下個節點線程調用shouldParkAfterFailedAcquire(p, node)改變頭節點的狀態爲SIGNAL,shouldParkAfterFailedAcquire(p, node)會返回false,繼續循環這時下個節點線程不會阻塞直接進入setHeadAndPropagate(node, r)方法。若是當前線程執行步驟4設置失敗,說明下個節點線程在調用此方法以前先調用shouldParkAfterFailedAcquire(p, node)改變頭節點的狀態,這樣也不影響流程,這時從新循環調用unparkSuccessor(h)便可。不過本人沒看出步驟4在CountDownLatch有什麼實質用處。

  • (5)步驟5是循環退出的條件若是頭節點未變退出循環,不然繼續循環,頭節點何時會改變,一個是喚醒下個節點後下個節點在步驟5前調用setHeadAndPropagate(node, r)方法更新頭節點,另一個就是另外一個線程添加節點後沒有阻塞直接setHeadAndPropagate(node, r)方法更新頭節點。

相關文章
相關標籤/搜索