Java多線程編程(5)--線程間通訊

一.等待與通知

  某些狀況下,程序要執行的操做須要知足必定的條件(下文統一將其稱之爲保護條件)才能執行。在單線程編程中,咱們可使用輪詢的方式來實現,即頻繁地判斷是否知足保護條件,若不知足則繼續判斷,若知足則開始執行。但在多線程編程中,這種方式無疑是很是低效的。若是一個線程持續進行無心義的判斷而不釋放CPU,這就會形成資源的浪費;而若是定時去判斷,不知足保護條件就釋放CPU,又會形成頻繁的上下文切換。總之,不推薦在多線程編程中使用輪詢的方式。
  等待與通知是這樣一種機制:當保護條件不知足時,能夠將當前線程暫停;而當保護條件成立時,再將這個線程喚醒。一個線程因其保護條件未知足而被暫停的過程就被稱爲等待,一個線程使得其餘線程的保護條件得以知足的時候喚醒那些被暫停的線程的過程就被稱爲通知。java

1.wait

  在Java平臺中,Object.wait方法能夠用來實現等待,下面是wait方法的三個重載方法:編程

  • void wait(long timeoutMillis)
    調用該方法會使線程進入TIMED_WAITING狀態,當等待時間結束或其餘線程調用了該對象的notify或notifyAll方法時會將該線程喚醒。
  • void wait(long timeoutMillis, int nanos)
    這個方法看上去能夠精確到納秒級別,但實際上並非。若是nanos的值在0~999999之間,就給timeoutMillis加1,而後調用wait(timeoutMillis)。
  • void wait()
    該方法至關於wait(0),即永不超時。調用後當前線程會進入WAITING狀態,直到其餘線程調用了該對象的notify或notifyAll方法。

  先經過一張圖來介紹wait的實現機制:

  在上一篇文章中咱們瞭解到JVM會爲每一個對象維護一個入口集(Entry Set)用於存儲申請該對象內部鎖的線程。此外,JVM還會爲每一個對象維護一個被稱爲等待集(Wait Set)的隊列,該隊列用於存儲該對象上的等待線程。當在線程中調用某個對象(這裏咱們稱之爲對象A)的wait方法後,當前線程會釋放內部鎖並進入WAITING或TIMED_WAITING狀態,而後進入等待集中。當其餘線程調用對象A的notify方法後,等待集中的某個線程會被喚醒並被移出等待集。這個線程可能會立刻得到內部鎖,也有可能因競爭內部鎖失敗而進入入口集,直到得到內部鎖。當從新獲取到內部鎖後,wait方法纔會返回,當前線程繼續執行後面的代碼。
  因爲wait方法會釋放內部鎖,所以在wait方法中會判斷當前線程是否持有被調用wait方法的對象的內部鎖。若是當前線程沒有持有該對象的內部鎖,JVM會拋出一個IllegalMonitorStateException異常。所以,wait方法在調用時當前線程必須持有該對象的內部鎖,即wait方法的調用必需要放在由該對象引導的synchronized同步塊中。綜上所述,使用wait方法實現等待的代碼模板以下僞代碼所示:安全

synchronized(someObject) {
    while(!someCondition) {
        someObject.wait();
    }
    doSomething();
}

  這裏使用while而不是if的緣由是,通知線程可能只是更新了保護條件中的共享變量,但並不必定會使保護條件成立;即便通知線程能夠保證保護條件成立,可是在線程從等待集進入入口集再到獲取到內部鎖的這段時間內,其餘線程仍然可能更新共享變量而致使保護條件不成立。線程雖然由於保護條件不成立而進入wait方法,但wait方法的返回並不能說明保護條件已經成立。所以,在wait方法返回後須要再次進行判斷,若保護條件成立則執行接下來的操做,不然應該繼續進入wait方法。正是基於這種考慮,咱們應該將wait方法的調用放在while循環而不是if判斷中。多線程

2.notify/notifyAll

  下圖是notify的實現機制:

  和wait方法同樣,notify方法在執行時也必須持有對象的內部鎖,不然會拋出IllegalMonitorStateException異常,所以notify方法也必須放在由該對象引導的synchronized同步塊中。notify方法會將等待集中的任意一個線程移出隊列。和wait方法不一樣的是,notify方法自己不會釋放內部鎖,而是在臨界區代碼執行完成後自動釋放。所以,爲了使等待線程在其被喚醒以後可以儘快得到內部鎖,應該儘量地將notify調用放在靠近臨界區結束的地方。
  調用notify方法所喚醒的線程是相應對象上的一個任意等待線程,可是這個被喚醒的線程可能不是咱們真正想要喚醒的那個線程。所以,有時候咱們須要藉助notifyAll,它和notify方法的惟一不一樣之處在於它能夠喚醒相應對象上的全部等待線程。dom

3.過早喚醒問題

  假設通知線程N和等待線程W1和W2同步在對象obj上,W1和W2的保護條件C1和C2均依賴於obj的實例變量state,但C1和C2判斷的內容並不相同。初始狀態下C1和C2均不成立。某一時刻,當線程N更新了共享變量state使得保護條件C1得以成立,此時爲了喚醒W1而執行了obj.notifyAll()方法(調用obj.notify()並不必定會喚醒W1)。因爲notifyAll喚醒的是obj上的全部等待線程,所以W2也會被喚醒,即便W2的保護條件並未成立。這就使得W2在被喚醒以後仍然須要繼續等待。這種等待線程在保護條件並未成立的狀況下被喚醒的現象被稱爲過早喚醒。過早喚醒使得那些無需被喚醒的等待線程也被喚醒了,形成了資源的浪費。過早喚醒問題能夠利用下一節中介紹的Condition接口來解決。ide

二.條件變量Condition

  總的來講,Object.wait()/notify()過於底層,且Object.wait(long timeout)還存在過早喚醒和沒法區分其返回是因爲等待超時仍是被通知線程喚醒的問題。不過,瞭解wait/notify有助於咱們閱讀部分源碼,以及學習和使用Condition接口。
  Condition接口能夠做爲wait/notify的替代品來實現等待/通知,它爲解決過早喚醒問題提供了支持,並解決了Object.wait(long timeout)沒法區分其返回是因爲等待超時仍是被通知線程喚醒的問題。Condition接口中定義瞭如下方法:

  在上一篇文章中,咱們在介紹Lock接口時曾經提到過它的newCondition方法,它返回的就是一個Condition實例。相似於Object.wait()/notify()要求其執行線程必須持有這些方法所屬對象的內部鎖,Condition.await()/signal()也要求其執行線程持有建立該Condition實例的顯式鎖。每一個Condition實例內部都維護了一個用於存儲等待線程的隊列。設condition1和condition2是從一個顯式鎖上獲取的兩個不一樣的Condition實例,一個線程執行condition1.await()會致使其被暫停並進入condition1的等待隊列。condition1.signal()會使condition1的等待隊列中的一個任意線程被喚醒,而condition1.signaAll()則會使condition1的等待隊列中的全部線程被喚醒,而condition2的等待隊列中的線程則不受影響。
  和wait/notify相似,await/signal的使用方法以下:工具

public class ConditionUsage {
    private final Lock lock = new ReentrantLock();
    private final Condition condition = lock.newCondition();

    public void waitMethod() throws InterruptedException {
        lock.lock();
        try {
            while (保護條件不成立) {
                condition.await();
            }
            // 業務邏輯
        } finally {
            lock.unlock();
        }
    }
    
    public void notifyMethod() {
        lock.unlock();
        try {
            // 更新共享變量
            condition.signal();
        } finally {
            lock.unlock();
        }
    }
}

  最後,以一個例子來結束本小節。這裏咱們以經典的生產者-消費者模型來舉例。假設有一個生產整數的生產者,一個消費奇數的消費者和一個消費偶數的消費者。當生產奇數時,生產者會通知奇數消費者,偶數同理。下面是完整代碼:
學習


展開查看

import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionDemo {
    private final Lock lock = new ReentrantLock();
    private final Condition oddCondition = lock.newCondition();
    private final Condition evenCondition = lock.newCondition();
    private final Random random = new Random();
    private volatile Integer message;
    private AtomicInteger count = new AtomicInteger(0);

    public static void main(String[] args) {
        ConditionDemo demo = new ConditionDemo();
        Thread producer = new Thread(() -> {
            while (true) {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                demo.produce();
            }
        });
        producer.start();
        Thread oddConsumer = new Thread(() -> {
            while (true) {
                try {
                    demo.consumeOdd();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        Thread evenConsumer = new Thread(() -> {
            while (true) {
                try {
                    demo.consumeEven();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        oddConsumer.start();
        evenConsumer.start();
    }

    public void produce() {
        lock.lock();
        if (message == null) {
            message = random.nextInt(100) + 1;
            count.incrementAndGet();
            if (message % 2 == 0) {
                evenCondition.signal();
                System.out.println("Produce even : " + message);
            } else {
                oddCondition.signal();
                System.out.println("Produce odd : " + message);
            }
        }
        lock.unlock();
    }

    public void consumeOdd() throws InterruptedException {
        lock.lock();
        while (message == null) {
            oddCondition.await();
        }
        System.out.println("Consume odd : " + message);
        message = null;
        lock.unlock();
    }

    public void consumeEven() throws InterruptedException {
        lock.lock();
        while (message == null) {
            evenCondition.await();
        }
        System.out.println("Consume even : " + message);
        message = null;
        lock.unlock();
    }
}

  該程序的輸出以下:this

Produce even : 34
Consume even : 34
Produce odd : 43
Consume odd : 43
Produce even : 28
Consume even : 28
Produce odd : 27
Consume odd : 27
Produce even : 92
Consume even : 92
...

三.倒數計數器CountDownLatch

  有時候,咱們但願一個線程在另外一個或多個線程結束以後再繼續執行,這時候咱們最早想到的確定是Thread.join()。有時咱們又但願一個線程不必定須要其餘線程結束,而只是等其餘線程執行完特定的操做就繼續執行。這種狀況下沒法使用Thread.join(),由於它會致使當前線程等待其餘線程徹底結束。固然,此時能夠用共享變量來實現。不過,Java爲咱們提供了更加方便的工具類來解決上面說的這些狀況,那就是CountDownLatch。
  能夠將CountDownLatch理解爲一個能夠在多個線程之間使用的計數器。這個類提供瞭如下方法:

  CountDownLatch內部也維護了一個用於存放等待線程的隊列。當計數器不爲0時,調用await方法的線程會被暫停並進入該隊列。當某個線程調用countDown方法的時候,計數器會減1。當計數器到0的時候,等待隊列中的全部線程都會被喚醒。計數器的初始值是在CountDownLatch的構造方法中指定的:atom

public CountDownLatch(int count)

  當計數器的值達到0以後就不會再變化。此時,調用countDown方法並不會致使異常的拋出,而且後續執行await方法的線程也不會被暫停。所以,CountDownLatch的使用是一次性的。此外,因爲CountDownLatch是線程安全的,所以在調用await、countDown方法時無需加鎖。
  下面的例子中,主線程等待兩個子線程結束以後再繼續執行。這裏使用了CountDownLatch來實現:

import java.util.concurrent.CountDownLatch;

public class CountDownLatchDemo {
    public static void main(String[] args) {
        CountDownLatch latch = new CountDownLatch(2);
        Runnable task = () -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " finished.");
            latch.countDown();
        };
        new Thread(task, "Thread 1").start();
        new Thread(task, "Thread 2").start();
        try {
            latch.await();
        } catch (InterruptedException e) {
            return;
        }
        System.out.println("Main thread continued.");
    }
}

  該程序輸出以下:

Thread 2 finished.
Thread 1 finished.
Main thread continued.

  能夠看到,當線程1和線程2執行完成後,主線程纔開始繼續執行。
  若是CountDownLatch內部計數器因爲程序的錯誤而永遠沒法達到0,那麼相應實例上的等待線程會一直處於WAITING狀態。避免該問題的出現有兩種方法:一是確保全部對countDown方法的調用都位於代碼中正確的位置,例如放在finally塊中。二是使用帶有時間限制的await方法。若是在規定時間內計時器值未達到0,該CountDownLatch實例上的等待線程也會被喚醒。該方法的返回值能夠用於區分其返回是不是因爲等待超時。

四.循環屏障CyclicBarrier

  有時候多個線程可能須要互相等待對方執行到代碼中的某個地方纔能繼續執行。這就相似於咱們在開會的時候必須等待全部與會人員都到場以後才能開始。Java中爲咱們提供了一個工具類CyclicBarrier,該類能夠用來實現這種等待。
  使用CyclicBarrier實現等待的線程被稱爲參與方(Party)。參與方只須要CyclicBarrier.await()就能夠實現等待。和CountDownLatch相似,CyclicBarrier也有一個計數器。當最後一個線程調用CyclicBarrier.await()時,以前的等待線程都會被喚醒,而最後一個線程自己並不會被暫停。和CountDownLatch不一樣的是,CyclicBarrier是能夠重複使用的,這也是爲何它的類名中含有Cyclic。當全部參與方被喚醒的時候,任何線程再次執行await方法又會致使該線程被暫停。
  CyclicBarrier提供了兩個構造器:

public CyclicBarrier​(int parties)
public CyclicBarrier​(int parties, Runnable barrierAction)

  能夠看到,在構造CyclicBarrier​時,必須提供參與方的數量。第二個構造器還容許咱們指定一個被稱爲barrierAction的任務(Runnable接口實例),該任務會被最後一個執行await方法的線程執行。所以,若是有須要在喚醒全部線程前執行的操做,可使用這個構造器。
  CyclicBarrier提供瞭如下6個方法:

1.public int await() throws InterruptedException,BrokenBarrierException

  若是當前線程不是最後一個參與方,那麼該線程在調用await()後將持續等待直到如下狀況發生:

  • 最後一個線程到達;
  • 當前線程被中斷;
  • 其餘正在等待的線程被中斷;
  • 其餘線程等待超時;
  • 其餘線程調用了當前屏障的reset()。

  若是當前線程在進入await()方法使已經被標記中斷狀態或在等待時被中斷,那麼await()將會拋出InterruptedException並清除當前線程的中斷狀態。
  若是屏障在參與方等待時被重置或被破壞,或者在調用await()時屏障已經被破壞,那麼await()將會拋出BrokenBarrierException。
  若是某個線程在等待時被中斷,那麼其餘等待線程將會拋出BrokenBarrierException而且屏障也會被標記爲broken狀態。
  該方法的返回值表示當前線程的到達索引,getParties()-1表示第一個到達,0表示最後一個到達。

2.public int await​(long timeout,TimeUnit unit) throws InterruptedException,BrokenBarrierException,TimeoutException

  該方法與至關於有時間限制的await(),等待時間結束以後該線程將會拋出TimeOutException,屏障會被標記爲broken狀態,其餘正在等待的線程則會拋出BrokenBarrierException。

3.public int getNumberWaiting()

  返回當前正在等待的參與方的數量。

4.public int getParties()

  返回總的參與方的數量。

5.public boolean isBroken()

  若是該屏障已經被破壞則返回true,不然返回false。當等待線程超時或被中斷,或者在執行barrierAction時出現異常,屏障將會被破壞。

6.public void reset()

  將屏障恢復到初始狀態,若是有正在等待的線程,這些線程會拋出BrokenBarrierException異常。

  下面咱們經過一個例子來學習如何使用CyclicBarrier。假設如今正在舉行短跑比賽,共有8名參賽選手,而場地上只有4條賽道,所以須要分爲兩場比賽。每場比賽必須等4名選手全都就緒才能夠開始,而上一場比賽結束以後即所有選手離開賽道以後才能進行下一場比賽。該示例代碼以下所示:


展開查看

import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicInteger;

public class CyclicBarrierDemo {
    private CyclicBarrier startBarrier = new CyclicBarrier(4, () -> System.out.println("比賽開始!"));
    private CyclicBarrier shiftBarrier = new CyclicBarrier(4, () -> System.out.println("比賽結束!"));
    private Runner[] runners = new Runner[8];
    private AtomicInteger next = new AtomicInteger(0);

    CyclicBarrierDemo() {
        for (int i = 0; i < 8; i++) {
            runners[i] = new Runner(i / 4 + 1, i % 4 + 1);
        }
    }

    public static void main(String[] args) {
        CyclicBarrierDemo demo = new CyclicBarrierDemo();
        for (int i = 0; i < 4; i++) {
            demo.new Track().start();
        }
    }

    private class Track extends Thread {
        private Random random = new Random();

        @Override
        public void run() {
            for (int i = 0; i < 2; i++) {
                try {
                    Runner runner = runners[next.getAndIncrement()];
                    System.out.println(runner.getGroup() + "組" + runner.getNumber() + "號準備就緒!");
                    startBarrier.await();
                    System.out.println(runner.getGroup() + "組" + runner.getNumber() + "號出發!");
                    Thread.sleep((random.nextInt(5) + 1) * 1000);
                    System.out.println(runner.getGroup() + "組" + runner.getNumber() + "號到達終點!");
                    shiftBarrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    private static class Runner {
        private int group;
        private int number;

        Runner(int group, int number) {
            this.group = group;
            this.number = number;
        }

        int getGroup() {
            return group;
        }

        int getNumber() {
            return number;
        }
    }
}

  該程序輸出以下:


展開查看

1組4號準備就緒!
1組2號準備就緒!
1組3號準備就緒!
1組1號準備就緒!
比賽開始!
1組4號出發!
1組2號出發!
1組1號出發!
1組3號出發!
1組3號到達終點!
1組2號到達終點!
1組4號到達終點!
1組1號到達終點!
比賽結束!
2組1號準備就緒!
2組2號準備就緒!
2組3號準備就緒!
2組4號準備就緒!
比賽開始!
2組4號出發!
2組1號出發!
2組3號出發!
2組2號出發!
2組1號到達終點!
2組4號到達終點!
2組3號到達終點!
2組2號到達終點!
比賽結束!

五.總結

  實際上,線程間的通訊方式遠不止上面介紹的這些,還有不少手段能夠在線程間傳遞信息,例如阻塞隊列、信號量、線程中斷機制等,咱們將會在以後的文章中進一步學習這部份內容。

相關文章
相關標籤/搜索