併發框架

JUC中提供了幾個比較經常使用的併發工具類,好比CountDownLatch、CyclicBarrier、Semaphore。 其實在之前咱們 課堂的演示代碼中,或多或少都有用到過這樣一些api,接下來咱們會帶你們去深刻研究一些經常使用的api。 CountDownLatch
countdownlatch是一個同步工具類,它容許一個或多個線程一直等待,直到其餘線程的操做執行完畢再執行。從 命名能夠解讀到countdown是倒數的意思,相似於咱們倒計時的概念。 countdownlatch提供了兩個方法,一個是countDown,一個是await, countdownlatch初始化的時候須要傳入一 個整數,在這個整數倒數到0以前,調用了await方法的程序都必需要等待,而後經過countDown來倒數。node

public static void main(String[] args) throws InterruptedException {
 
    CountDownLatch countDownLatch=new CountDownLatch(3);
 
    new Thread(()->{
    
        countDownLatch.countDown();
 
    },"t1").start();
 
    new Thread(()->{
    
        countDownLatch.countDown();

    },"t2").start();
 

    new Thread(()->{

        countDownLatch.countDown();

    },"t3").start();

    countDownLatch.await();
 
    System.out.println("全部線程執行完畢");
 
}

從代碼的實現來看,有點相似join的功能,可是比join更加靈活。CountDownLatch構造函數會接收一個int類型的 參數做爲計數器的初始值,當調用CountDownLatch的countDown方法時,這個計數器就會減一。 經過await方法去阻塞去阻塞主流程
使用場景
1. 經過countdownlatch實現最大的並行請求,也就是可讓N個線程同時執行,這個我也是在課堂上寫得比較多 的
2. 好比應用程序啓動以前,須要確保相應的服務已經啓動,好比咱們以前在講zookeeper的時候,經過原生api連 接的地方有用到countDownLatch 源碼分析
CountDownLatch類存在一個內部類Sync,上節課咱們講過,它是一個同步工具,必定繼承了 AbstractQueuedSynchronizer。很顯然,CountDownLatch其實是是使得線程阻塞了,既然涉及到阻塞,就一 定涉及到AQS隊列 await
await函數會使得當前線程在countdownlatch倒計時到0以前一直等待,除非線程別中斷;從源碼中能夠得知await 方法會轉發到Sync的acquireSharedInterruptibly
方法
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } acquireSharedInterruptibly
這塊代碼主要是判斷當前線程是否獲取到了共享鎖; 上一節課提到過,AQS有兩種鎖類型,一種是共享鎖,一種是 獨佔鎖,在這裏用的是共享鎖; 爲何要用共享鎖,由於CountDownLatch能夠多個線程同時經過。api

public final void acquireSharedInterruptibly(int arg)
 
        throws InterruptedException {
doAcquireSharedInterruptibly 
獲取共享鎖
 
    if (Thread.interrupted()) //判斷線程是否中斷
 
        throw new InterruptedException();
 
    if (tryAcquireShared(arg) < 0) //若是等於0則返回1,不然返回-1,返回-1表示須要阻塞
 
        doAcquireSharedInterruptibly(arg);
 
} 在這裏,state的意義是count,若是計數器爲0,表示不須要阻塞,不然,只有在知足條件的狀況下才會被喚醒

doAcquireSharedInterruptibly
獲取共享鎖併發

private void doAcquireSharedInterruptibly(int arg)
 
    throws InterruptedException {
 
    final Node node = addWaiter(Node.SHARED); //建立一個共享模式的節點添加到隊列中    boolean failed = true;
 
    try {
 
        for (;;) { //自旋等待共享鎖釋放,也就是等待計數器等於0。
 
            final Node p = node.predecessor(); //得到當前節點的前一個節點
 
            if (p == head) {
 
                int r = tryAcquireShared(arg);//就判斷嘗試獲取鎖
 
                if (r >= 0) {//r>=0表示計數器已經歸零了,則釋放當前的共享鎖
 
                    setHeadAndPropagate(node, r);
 
                    p.next = null; // help GC
 
                    failed = false;
 
                    return;
 
                }
 
            } //當前節點不是頭節點,則嘗試讓當前線程阻塞,第一個方法是判斷是否須要阻塞,第二個方法是阻塞
 
            if (shouldParkAfterFailedAcquire(p, node) &&
 
                parkAndCheckInterrupt())
 
                throw new InterruptedException();
        }
 
    } finally {
 
        if (failed)
 
            cancelAcquire(node);
 
    }
 
}

待續。。。。。。。。。函數

相關文章
相關標籤/搜索