Java併發編程之CountDownLatch

簡介

在平常的開發中,可能會遇到這樣的場景:開啓多個子線程執行一些耗時任務,而後在主線程彙總,在子線程執行的過程當中,主線程保持阻塞狀態直到子線程完成任務。ide

使用CountDownLatch類或者Thread類的join()方法都能實現這一點,下面經過例子來介紹這兩種實現方法。函數

CountDownLatch的使用

一個小例子,等待全部玩家準備就緒,而後遊戲纔開始。ui

使用join方法實現:this

public class Demo {
    public static void main(String[] args) throws InterruptedException {
        Runnable runnable = () -> {
            System.out.println(Thread.currentThread().getName() + ":準備就緒");
        };

        Thread thread1 = new Thread(runnable, "一號玩家");
        Thread thread2 = new Thread(runnable, "二號玩家");
        Thread thread3 = new Thread(runnable, "三號玩家");
        Thread thread4 = new Thread(runnable, "四號玩家");
        Thread thread5 = new Thread(runnable, "五號玩家");
        thread1.start();
        thread2.start();
        thread3.start();
        thread4.start();
        thread5.start();

	//主線程等待子線程執行完成再執行
        thread1.join();
        thread2.join();
        thread3.join();
        thread4.join();
        thread5.join();

        System.out.println("---遊戲開始---");
    }
}

/*
 * 輸出結果:
 * 二號玩家:準備就緒
 * 五號玩家:準備就緒
 * 四號玩家:準備就緒
 * 三號玩家:準備就緒
 * 一號玩家:準備就緒
 * ---遊戲開始---
 */

使用CountDownLatch實現:線程

public class Demo {
    public static void main(String[] args) throws InterruptedException {
        //建立計數器初始值爲5的CountDownLatch
        CountDownLatch countDownLatch = new CountDownLatch(5);

        Runnable runnable = () -> {
            try{
                System.out.println(Thread.currentThread().getName() + ":準備就緒");
            }catch (Exception ex){
                ex.printStackTrace();
            }finally {
                //計數器值減一
                countDownLatch.countDown();
            }
        };

        Thread thread1 = new Thread(runnable, "一號玩家");
        Thread thread2 = new Thread(runnable, "二號玩家");
        Thread thread3 = new Thread(runnable, "三號玩家");
        Thread thread4 = new Thread(runnable, "四號玩家");
        Thread thread5 = new Thread(runnable, "五號玩家");
        thread1.start();
        thread2.start();
        thread3.start();
        thread4.start();
        thread5.start();

        //等待計數器值爲0
        countDownLatch.await();
        System.out.println("---遊戲開始---");
    }
}

/*
 * 輸出結果:
 * 四號玩家:準備就緒
 * 五號玩家:準備就緒
 * 一號玩家:準備就緒
 * 三號玩家:準備就緒
 * 二號玩家:準備就緒
 * ---遊戲開始---
 */

CountDownLatch內部包含一個計數器,計數器的初始值爲CountDownLatch構造函數傳入的int類型的參數,countDown方法會遞減計數器值,await方法會阻塞當前線程直到計數器值爲0。繼承

兩種方式的區別:

當調用子線程的join方法時,會阻塞當前線程直到子線程結束。而CountDownLatch相對比較靈活,無需等到子線程結束,只要計數器值爲0,await方法就會返回。隊列

CountDownLatch源碼

CountDownLatch源碼:遊戲

public class CountDownLatch {
    /**
     * CountDownLatch的同步控制,使用AQS的狀態值做爲計數器值。
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

    private final Sync sync;

    /**
     * 構造函數,初始化計數器
     */
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

    /**
     * 阻塞當前線程直到計數器值爲0
     */
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    /**
     * 阻塞當前線程直到計數器值爲0或者超時
     */
    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    /**
     * 遞減計數器值,當計數器值爲0時,釋放全部等待的線程。
     */
    public void countDown() {
        sync.releaseShared(1);
    }

    /**
     * 返回當前計數器值
     */
    public long getCount() {
        return sync.getCount();
    }

    public String toString() {
        return super.toString() + "[Count = " + sync.getCount() + "]";
    }
}

經過源碼能夠看出,CountDownLatch內部是使用AQS實現的,它使用AQS的狀態變量state做爲計數器值,靜態內部類Sync繼承了AQS並實現了tryAcquireShared和tryReleaseShared方法。資源

接下來重點看下await()和countDown()的源碼:開發

await()方法內部調用的是AQS的acquireSharedInterruptibly方法,會將當前線程放入AQS隊列等待,直到計數值爲0。

public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
	//判斷當前線程是否被中斷,若是線程被中斷則拋出異常
	if (Thread.interrupted())
		throw new InterruptedException();
	//判斷計數器值是否爲0,爲0則直接返回,不然進AQS隊列進行等待。
	if (tryAcquireShared(arg) < 0)
		doAcquireSharedInterruptibly(arg);
}

//CountDownLatch中Sync的tryAcquireShared方法實現,直接判斷計數器值是否爲0。
protected int tryAcquireShared(int acquires) {
	return (getState() == 0) ? 1 : -1;
}

countDown()方法內部調用的是AQS的releaseShared方法,每次調用都會遞減計數值,直到計數值爲0則調用AQS釋放資源的方法。

public final boolean releaseShared(int arg) {
	if (tryReleaseShared(arg)) {
		//釋放資源
		doReleaseShared();
		return true;
	}
	return false;
}

//CountDownLatch中Sync的tryReleaseShared方法實現
protected boolean tryReleaseShared(int releases) {
	for (;;) {
		int c = getState();
		//計數值爲0直接返回
		if (c == 0)
			return false;
		//設置遞減後的計數值
		int nextc = c-1;
		if (compareAndSetState(c, nextc))
			return nextc == 0;
	}
}
相關文章
相關標籤/搜索