Java併發(6)- CountDownLatch、Semaphore與AQS

引言

上一篇文章中詳細分析了基於AQS的ReentrantLock原理,ReentrantLock經過AQS中的state變量0和1之間的轉換表明了獨佔鎖。那麼能夠思考一下,當state變量大於1時表明了什麼?J.U.C中是否有基於AQS的這種實現呢?若是有,那他們都是怎麼實現的呢?這些疑問經過詳細分析J.U.C中的Semaphore與CountDownLatch類後,將會獲得解答。node

  1. Semaphore與CountDownLatch的共享邏輯
  2. Semaphore與CountDownLatch的使用示例
  • 2.1 Semaphore的使用
  • 2.2 CountDownLatch的使用
  1. 源碼分析
  • 3.1 AQS中共享鎖的實現
  • 3.2 Semaphore源碼分析
  • 3.3 CountDownLatch源碼分析
  1. 總結

1. Semaphore與CountDownLatch的共享方式

獨佔鎖意味着只能有一個線程獲取鎖,其餘的線程在鎖被佔用的狀況下都必須等待鎖釋放後才能進行下一步操做。由此類推,共享鎖是否意味着能夠由多個線程同時使用這個鎖,不須要等待呢?若是是這樣,那鎖的意義也就不存在了。在J.U.C中共享意味着有多個線程能夠同時獲取鎖,但這個多個是有限制的,並非無限個,J.U.C中經過Semaphore與CountDownLatch來分別實現了兩種有限共享鎖。bash

Semaphore又叫信號量,他經過一個共享的’信號包‘來給每一個使用他的線程來分配信號,當信號包中的信號足夠時,線程能夠獲取鎖,反之,信號包中信號不夠了,則不能獲取到鎖,須要等待足夠的信號被釋放,才能獲取。ide

CountDownLatch又叫計數器,他經過一個共享的計數總量來控制線程鎖的獲取,當計數器總量大於0時,線程將被阻塞,不可以獲取鎖,只有當計數器總量爲0時,全部被阻塞的線程同時被釋放。oop

能夠看到Semaphore與CountDownLatch都有一個共享總量,這個共享總量就是經過state來實現的。源碼分析

2. Semaphore與CountDownLatch的使用示例

在詳細分析Semaphore與CountDownLatch的原理以前,先來看看他們是怎麼使用的,這樣方便後續咱們理解他們的原理。先知道他是什麼?而後再問爲何?下面經過兩個示例來詳細說明Semaphore與CountDownLatch的使用。ui

2.1 Semaphore的使用

//初始化10個信號量在信號包中,讓ABCD4個線程分別去獲取
public static void main(String[] args) throws InterruptedException {
    Semaphore semaphore = new Semaphore(10);
	SemaphoreTest(semaphore);
}

private static void SemaphoreTest(final Semaphore semaphore) throws InterruptedException {
    //線程A初始獲取了4個信號量,而後分3次釋放了這4個信號量
    Thread threadA = new Thread(new Runnable() {

        @Override
        public void run() {
            try {
                semaphore.acquire(4);
                System.out.println(Thread.currentThread().getName() + " get 4 semaphore");
                Thread.sleep(2000);
                System.out.println(Thread.currentThread().getName() + " release 1 semaphore");
                semaphore.release(1);
                Thread.sleep(2000);
                System.out.println(Thread.currentThread().getName() + " release 1 semaphore");
                semaphore.release(1);
                Thread.sleep(2000);
                System.out.println(Thread.currentThread().getName() + " release 2 semaphore");
                semaphore.release(2);

            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    });
    threadA.setName("threadA");

    //線程B初始獲取了5個信號量,而後分2次釋放了這5個信號量
    Thread threadB = new Thread(new Runnable() {

        @Override
        public void run() {
            try {
                semaphore.acquire(5);
                System.out.println(Thread.currentThread().getName() + " get 5 semaphore");
                Thread.sleep(2000);
                System.out.println(Thread.currentThread().getName() + " release 2 semaphore");
                semaphore.release(2);
                Thread.sleep(2000);
                System.out.println(Thread.currentThread().getName() + " release 3 semaphore");
                semaphore.release(3);

            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    });
    threadB.setName("threadB");

    //線程C初始獲取了4個信號量,而後分1次釋放了這4個信號量
    Thread threadC = new Thread(new Runnable() {

        @Override
        public void run() {
            try {
                semaphore.acquire(4);
                System.out.println(Thread.currentThread().getName() + " get 4 semaphore");
                Thread.sleep(1000);
                System.out.println(Thread.currentThread().getName() + " release 4 semaphore");
                semaphore.release(4);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    });
    threadC.setName("threadC");
    
    //線程D初始獲取了10個信號量,而後分1次釋放了這10個信號量
    Thread threadD = new Thread(new Runnable() {

        @Override
        public void run() {
            try {
                semaphore.acquire(10);
                System.out.println(Thread.currentThread().getName() + " get 10 semaphore");
                Thread.sleep(1000);
                System.out.println(Thread.currentThread().getName() + " release 10 semaphore");
                semaphore.release(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    });
    threadD.setName("threadD");
    
    //線程A和線程B首先分別獲取了4個和5個信號量,總信號量變爲了1個
    threadA.start();
    threadB.start();
    Thread.sleep(1);
    //線程C嘗試獲取4個發現不夠則等待
    threadC.start();
    Thread.sleep(1);
    //線程D嘗試獲取10個發現不夠則等待
    threadD.start();
}
複製代碼

執行結果以下:this

threadB get 5 semaphore
threadA get 4 semaphore
threadA release 1 semaphore
threadB release 2 semaphore
threadC get 4 semaphore
threadA release 1 semaphore
threadC release 4 semaphore
threadB release 3 semaphore
threadA release 2 semaphore
threadD get 10 semaphore
threadD release 10 semaphore
複製代碼

能夠看到threadA和threadB在獲取了9個信號量以後threadC和threadD以後等待信號量足夠時才能繼續往下執行。而threadA和threadB在信號量足夠時是能夠同時執行的。spa

其中有一個問題,當threadD排隊在threadC以前時,信號量若是被釋放了4個,threadC會先於threadD執行嗎?仍是須要排隊等待呢?這個疑問在詳細分析了Semaphore的源碼以後再來給你們答案。線程

2.2 CountDownLatch的使用

//初始化計數器總量爲2
public static void main(String[] args) throws InterruptedException {
    CountDownLatch countDownLatch = new CountDownLatch(2);
    CountDownLatchTest(countDownLatch);
}

private static void CountDownLatchTest(final CountDownLatch countDownLatch) throws InterruptedException {
    //threadA嘗試執行,計數器爲2被阻塞
    Thread threadA = new Thread(new Runnable() {

        @Override
        public void run() {
            try {
                countDownLatch.await();
                System.out.println(Thread.currentThread().getName() + " await");

            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    });
    threadA.setName("threadA");

    //threadB嘗試執行,計數器爲2被阻塞
    Thread threadB = new Thread(new Runnable() {

        @Override
        public void run() {
            try {
                countDownLatch.await();
                System.out.println(Thread.currentThread().getName() + " await");
                
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    });
    threadB.setName("threadB");
    
    //threadC在1秒後將計數器數量減1
    Thread threadC = new Thread(new Runnable() {

        @Override
        public void run() {
            try {
                Thread.sleep(1000);
                countDownLatch.countDown();
                
                System.out.println(Thread.currentThread().getName() + " countDown");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    });
    threadC.setName("threadC");
    
    //threadD在5秒後將計數器數量減1
    Thread threadD = new Thread(new Runnable() {

        @Override
        public void run() {
            try {
                Thread.sleep(5000);
                countDownLatch.countDown();
                
                System.out.println(Thread.currentThread().getName() + " countDown");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    });
    threadD.setName("threadD");
    
    threadA.start();
    threadB.start();
    threadC.start();
    threadD.start();
}
複製代碼

執行結果以下:code

threadC countDown
threadD countDown
threadA await
threadB await
複製代碼

threadA和threadB在嘗試執行時因爲計數器總量爲2被阻塞,當threadC和threadD將計數器總量減爲0後,threadA和threadB同時開始執行。

總結一下:Semaphore就像旋轉壽司店,共有10個座位,當座位有空餘時,等待的人就能夠坐上去。若是有隻有2個空位,來的是一家3口,那就只有等待。若是來的是一對情侶,就能夠直接坐上去吃。固然若是同時空出5個空位,那一家3口和一對情侶能夠同時上去吃。CountDownLatch就像大型商場裏面的臨時遊樂場,每一場遊樂的時間事後等待的人同時進場玩,而一場中間會有不愛玩了的人隨時出來,但不能進入,一旦全部進入的人都出來了,新一批人就能夠同時進場。

3. 源碼分析

明白了Semaphore與CountDownLatch是作什麼的,怎麼使用的。接下來就來看看Semaphore與CountDownLatch底層時怎麼實現這些功能的。

3.1 AQS中共享鎖的實現

上篇文章經過對ReentrantLock的分析,得倒了AQS中實現獨佔鎖的幾個關鍵方法:

//狀態量,獨佔鎖在0和1之間切換
private volatile int state;

//調用tryAcquire獲取鎖,獲取失敗後加入隊列中掛起等操做,AQS中實現
public final void acquire(int arg);

//獨佔模式下嘗試獲取鎖,ReentrantLock中實現
protected boolean tryAcquire(int arg);

//調用tryRelease釋放鎖以及恢復線程等操做,AQS中實現
public final boolean release(int arg);

//獨佔模式下嘗試釋放鎖,ReentrantLock中實現
protected boolean tryRelease(int arg);
複製代碼

其中具體的獲取和釋放獨佔鎖的邏輯都放在ReentrantLock中本身實現,AQS中負責管理獲取或釋放獨佔鎖成功失敗後須要具體處理的邏輯。那麼共享鎖的實現是否也是遵循這個規律呢?由此咱們在AQS中發現瞭如下幾個相似的方法:

//調用tryAcquireShared獲取鎖,獲取失敗後加入隊列中掛起等操做,AQS中實現
public final void acquireShared(int arg);

//共享模式下嘗試獲取鎖
protected int tryAcquireShared(int arg);

//調用tryReleaseShared釋放鎖以及恢復線程等操做,AQS中實現
public final boolean releaseShared(int arg);

//共享模式下嘗試釋放鎖
protected boolean tryReleaseShared(int arg);
複製代碼

共享鎖和核心就在上面4個關鍵方法中,先來看看Semaphore是怎麼調用上述方法來實現共享鎖的。

3.2 Semaphore源碼分析

首先是Semaphore的構造方法,同ReentrantLock同樣,他有兩個構造方法,這樣也是爲了實現公平共享鎖和非公平共享鎖,你們可能有疑問,既然是共享鎖,爲何還分公平和非公平的呢?這就回到了上面那個例子後面的疑問,前面有等待的線程時,後來的線程是否能夠直接獲取信號量,仍是必定要排隊。等待固然是公平的,插隊就是非公平的。

仍是用旋轉壽司的例子來講:如今只有2個空位,已經有一家3口在等待,這時來了一對情侶,公平共享鎖的實現就是這對情侶必須等待,只到一家3口上桌以後才輪到他們,而非公平共享鎖的實現是可讓這對狀況直接去吃,由於恰好有2個空位,讓一家3口繼續等待(好像是很不公平......),這種狀況下非公平共享鎖的好處就是能夠最大化壽司店的利潤(好像同時也得罪了等待的顧客......),也是Semaphore默認的實現方式。

public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
複製代碼

Semaphore的例子中使用了兩個核心方法acquire和release,分別調用了AQS中的acquireSharedInterruptibly和releaseShared方法:

//獲取permits個信號量
public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}

//釋放permits個信號量
public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);
}

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0) //嘗試獲取arg個信號量
        doAcquireSharedInterruptibly(arg); //獲取信號量失敗時排隊掛起
}

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) { //嘗試釋放arg個信號量
        doReleaseShared();
        return true;
    }
    return false;
}
複製代碼

Semaphore在獲取和釋放信號量的流程都是經過AQS來實現,具體怎麼算獲取成功或釋放成功則由Semaphore自己實現。

//公平共享鎖嘗試獲取acquires個信號量
protected int tryAcquireShared(int acquires) {
    for (;;) {
        if (hasQueuedPredecessors()) //前面是否有排隊,有則返回獲取失敗
            return -1;
        int available = getState(); //剩餘的信號量(旋轉壽司店剩餘的座位)
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining)) // 剩餘信號量不夠,夠的狀況下嘗試獲取(旋轉壽司店座位不夠,或者同時來兩對狀況搶座位)
            return remaining;
    }
}
//非公平共享鎖嘗試獲取acquires個信號量
final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState(); //剩餘的信號量(旋轉壽司店剩餘的座位)
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining)) // 剩餘信號量不夠,夠的狀況下嘗試獲取(旋轉壽司店座位不夠,或者同時來兩對情侶搶座位)
            return remaining;
    }
}
複製代碼

能夠看到公平共享鎖和非公平共享鎖的區別就在是否須要判斷隊列中是否有已經等待的線程。公平共享鎖須要先判斷,非公平共享鎖直接插隊,儘管前面已經有線程在等待。

爲了驗證這個結論,稍微修改下上面的示例:

threadA.start();
threadB.start();
Thread.sleep(1);
threadD.start(); //threadD已經在排隊
Thread.sleep(3500);
threadC.start(); //3500毫秒後threadC來插隊
複製代碼

結果輸出:

threadB get 5 semaphore
threadA get 4 semaphore
threadB release 2 semaphore
threadA release 1 semaphore
threadC get 4 semaphore //threadC先與threadD獲取到信號量
threadA release 1 semaphore
threadB release 3 semaphore
threadC release 4 semaphore
threadA release 2 semaphore
threadD get 10 semaphore
threadD release 10 semaphore
複製代碼

這個示例很好的說明了當爲非公平鎖時會先嚐試獲取共享鎖,而後才排隊。

當獲取信號量失敗以後會去排隊,排隊這個操做經過AQS中的doAcquireSharedInterruptibly方法來實現:

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED); //加入等待隊列
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor(); //獲取當前節點的前置節點
            if (p == head) {
                int r = tryAcquireShared(arg); //前置節點是頭節點時,說明當前節點是第一個掛起的線程節點,再次嘗試獲取共享鎖
                if (r >= 0) {
                    setHeadAndPropagate(node, r); //與ReentrantLock不一樣的地方:獲取共享鎖成功設置頭節點,同時通知下一個節點
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) && //非頭節點或者獲取鎖失敗,檢查節點狀態,查看是否須要掛起線程
                parkAndCheckInterrupt()) //掛起線程,當前線程阻塞在這裏!
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
複製代碼

這一段代碼和ReentrantLock中的acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法基本同樣,說下兩個不一樣的地方。一是加入等待隊列時這裏加入的是Node.SHARED類型的節點。二是獲取鎖成功後會通知下一個節點,也就是喚醒下一個線程。以旋轉壽司店的例子爲例,前面同時走了5個客人,空餘5個座位,一家3口坐進去以後會告訴後面的一對情侶,讓他們也坐進去,這樣就達到了共享的目的。shouldParkAfterFailedAcquire和parkAndCheckInterrupt方法在上一篇文章中都有詳細說明,這裏就作解釋了。

再來看看releaseShared方法時怎麼釋放信號量的,首先調用tryReleaseShared來嘗試釋放信號量,釋放成功後調用doReleaseShared來判斷是否須要喚醒後繼線程:

protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState();
        int next = current + releases;
        if (next < current) // overflow //釋放信號量過多
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next)) //cas操做設置新的信號量
            return true;
    }
}

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) { //SIGNAL狀態下喚醒後繼節點
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h); //喚醒後繼節點
            }
            else if (ws == 0 &&
                        !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}
複製代碼

釋放的邏輯很好理解,相比ReentrantLock只是在state的數量上有點差異。

3.3 CountDownLatch源碼分析

CountDownLatch相比Semaphore在實現邏輯上要簡單的多,同時他也沒有公平和非公平的區別,由於當計數器達到0的時候,全部等待的線程都會釋放,不爲0的時候,全部等待的線程都會阻塞。直接來看看CountDownLatch的兩個核心方法await和countDown。

public void await() throws InterruptedException {
    //和Semaphore的不一樣在於參數爲1,其實這個參數對CountDownLatch來講沒什麼意義,由於後面CountDownLatch的tryAcquireShared實現是經過getState() == 0來判斷的
    sync.acquireSharedInterruptibly(1); 
}

public boolean await(long timeout, TimeUnit unit)
    throws InterruptedException {
    //這裏加入了一個等待超時控制,超過期間後直接返回false執行後面的代碼,不會長時間阻塞
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); 
}

public void countDown() {
    sync.releaseShared(1); //每次釋放1個計數
}

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0) //嘗試獲取arg個信號量
        doAcquireSharedInterruptibly(arg); //獲取信號量失敗時排隊掛起
}

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1; //奠基了同時獲取鎖的基礎,不管State初始爲多少,只能計數等於0時觸發
}
複製代碼

和Semaphore區別有兩個,一是State每次只減小1,同時只有爲0時才釋放全部等待線程。二是提供了一個超時等待方法。acquireSharedInterruptibly方法跟Semaphore同樣,就不細說了,這裏重點說下tryAcquireSharedNanos方法。

public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquireShared(arg) >= 0 ||
        doAcquireSharedNanos(arg, nanosTimeout);
}

//最小自旋時間
static final long spinForTimeoutThreshold = 1000L;

private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout; //計算了一個deadline
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
            }
            nanosTimeout = deadline - System.nanoTime(); 
            if (nanosTimeout <= 0L) //超時後直接返回false,繼續執行
                return false;
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold) //大於最小cas操做時間則掛起線程
                LockSupport.parkNanos(this, nanosTimeout); //掛起線程也有超時限制
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
複製代碼

重點看標了註釋的幾行代碼,首先計算了一個超時時間,當超時後直接退出等待,繼續執行。若是未超時而且大於最小的cas操做時間,這裏定義的是1000ns,則掛起,同時掛起操做也有超時限制。這樣就實現了超時等待。

4.總結

至此關於AQS的共享鎖的兩個實現Semaphore與CountDownLatch就分析完了,他們與非共享最大的區別就在因而否能多個線程同時獲取鎖。看完後但願你們能對Semaphore與CountDownLatch有深入的理解,不明白的時候想一想旋轉壽司店和遊樂場的例子,若是對你們有幫助,以爲寫的好的話,能夠點個贊,固然更但願你們能積極指出文中的錯誤和提出積極的改進意見。

相關文章
相關標籤/搜索