Java併發編程之線程間通信(下)-生產者與消費者

前文回顧

上一篇文章重點嘮叨了java中協調線程間通訊的wait/notify機制,它有力的保證了線程間通訊的安全性以及便利性。本篇將介紹wait/notify機制的一個應用以及更多線程間通訊的內容。java

生產者-消費者模式

目光從廁所轉到飯館,一個飯館裏一般都有好多廚師以及好多服務員,這裏咱們把廚師稱爲生產者,把服務員稱爲消費者,廚師和服務員是不直接打交道的,而是在廚師作好菜以後放到窗口,服務員從窗口直接把菜端走給客人就行了,這樣會極大的提高工做效率,由於省去了生產者和消費者之間的溝通成本。從java的角度看這個事情,每個廚師就至關於一個生產者線程,每個服務員都至關於一個消費者線程,而放菜的窗口就至關於一個緩衝隊列生產者線程不斷把生產好的東西放到緩衝隊列裏,消費者線程不斷從緩衝隊列裏取東西,畫個圖就像是這樣:程序員

圖片描述

現實中放菜的窗口能放的菜數量是有限的,咱們假設這個窗口只能放5個菜。那麼廚師在作完菜以後須要看一下窗口是否是滿了,若是窗口已經滿了的話,就在一旁抽根菸等待,直到有服務員來取菜的時候通知一下廚師窗口有了空閒,能夠放菜了,這時廚師再把本身作的菜放到窗口上去炒下一個菜。從服務員的角度來講,若是窗口是空的,那麼也去一旁抽根菸等待,直到有廚師把菜作好了放到窗口上,而且通知他們一下,而後再把菜端走。數組

咱們先用java抽象一下菜:安全

public class Food {

    private static int counter = 0;

    private int i;  //表明生產的第幾個菜

    public Food() {
        i = ++counter;
    }

    @Override
    public String toString() {
        return "第" + i + "個菜";
    }
}

每次建立Food對象,字段i的值都會加1,表明這是建立的第幾道菜。多線程

爲了故事的順利進行,咱們首先定義一個工具類:併發

class SleepUtil {

    private static Random random = new Random();

    public static void randomSleep() {
        try {
            Thread.sleep(random.nextInt(1000));
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

SleepUtil的靜態方法randomSleep表明當前線程隨機休眠一秒內的時間。dom

而後咱們再用java定義一下廚師:ide

public class Cook extends Thread {

    private Queue<Food> queue;

    public Cook(Queue<Food> queue, String name) {
        super(name);
        this.queue = queue;
    }

    @Override
    public void run() {

        while (true) {
            SleepUtil.randomSleep();    //模擬廚師炒菜時間

            Food food = new Food();
            System.out.println(getName() + " 生產了" + food);

            synchronized (queue) {
                while (queue.size() > 4) {
                    try {
                        System.out.println("隊列元素超過5個,爲:" + queue.size() + " " + getName() + "抽根菸等待中");
                        queue.wait();

                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }

                queue.add(food);
                queue.notifyAll();
            }
        }
    }
}

咱們說每個廚師Cook都是一個線程,內部維護了一個名叫queue的隊列。在run方法中是一個死循環,表明不斷的生產Food。他每生產一個Food後,都要判斷queue隊列中元素的個數是否是大於4,若是大於4的話,就調用queue.wait()等待,若是不大於4的話,就把建立號的Food對象放到queue隊列中,因爲可能多個線程同時訪問queue的各個方法,因此對這段代碼用queue對象來加鎖保護。當向隊列添加完剛建立的Food對象以後,就能夠通知queue這個鎖對象關聯的等待隊列中的服務員線程們能夠繼續端菜了。工具

而後咱們再用java定義一下服務員:this

class Waiter extends Thread {

    private Queue<Food> queue;

    public Waiter(Queue<Food> queue, String name) {
        super(name);
        this.queue = queue;
    }

    @Override
    public void run() {

        while (true) {

            Food food;
            synchronized (queue) {
                while (queue.size() < 1) {
                    try {
                        System.out.println("隊列元素個數爲: " + queue.size() + "," + getName() + "抽根菸等待中");
                        queue.wait();
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }

                food = queue.remove();
                System.out.println(getName() + " 獲取到:" + food);
                queue.notifyAll();
            }

            SleepUtil.randomSleep();    //模擬服務員端菜時間
        }
    }
}

每一個服務員也是一個線程,和廚師同樣,都在內部維護了一個名叫queue的隊列。在run方法中是一個死循環,表明不斷的從隊列中取走Food。每次在從queue隊列中取Food對象的時候,都須要判斷一下隊列中的元素是否小於1,若是小於1的話,就調用queue.wait()等待,若是不小於1的話,也就是隊列裏有元素,就從隊列裏取走一個Food對象,而且通知與queue這個鎖對象關聯的等待隊列中的廚師線程們能夠繼續向隊列裏放入Food對象了。

在廚師和服務員線程類都定義好了以後,咱們再建立一個Restaurant類,來看看在餐館裏真實發生的事情:

public class Restaurant {

    public static void main(String[] args) {

        Queue<Food> queue = new LinkedList<>();
        new Cook(queue, "1號廚師").start();
        new Cook(queue, "2號廚師").start();
        new Cook(queue, "3號廚師").start();
        new Waiter(queue, "1號服務員").start();
        new Waiter(queue, "2號服務員").start();
        new Waiter(queue, "3號服務員").start();
    }
}

咱們在Restaurant中安排了3個廚師和3個服務員,你們執行一下這個程序,會發如今若是廚師生產的過快,廚師就會等待,若是服務員端菜速度過快,服務員就會等待。可是整個過程廚師和服務員是沒有任何關係的,它們是經過隊列queue實現了所謂的解耦。

這個過程雖然不是很複雜,可是使用中仍是須要注意一些問題:

  • 咱們這裏的廚師和服務員使用同一個鎖queue

使用同一個鎖是由於對queue的操做只能用同一個鎖來保護,假設使用不一樣的鎖,廚師線程調用queue.add方法,服務員線程調用queue.remove方法,這兩個方法都不是原子操做,多線程併發執行的時候會出現不可預測的結果,因此咱們使用同一個鎖來保護對queue這個變量的操做,這一點咱們在嘮叨設計線程安全類的時候已經強調過了。

  • 廚師和服務員線程使用同一個鎖queue的後果就是廚師線程和服務員線程使用的是同一個等待隊列。

可是同一時刻廚師線程和服務員線程不會同時在等待隊列中,由於當廚師線程在wait的時候,隊列裏的元素確定是5,此時服務員線程確定是不會wait的,可是消費的過程是被鎖對象queue保護的,因此在一個服務員線程消費了一個Food以後,就會調用notifyAll來喚醒等待隊列中的廚師線程們;當消費者線程在wait的時候,隊列裏的元素確定是0,此時廚師線程確定是不會wait的,生產的過程是被鎖對象queue保護的,因此在一個廚師線程生產了一個Food對象以後,就會調用notifyAll來喚醒等待隊列中的服務員線程們。因此同一時刻廚師線程服務員線程不會同時在等待隊列中。

  • 在生產和消費過程,咱們都調用了SleepUtil.randomSleep();。

咱們這裏的生產者-消費者模型是把實際使用的場景進行了簡化,真正的實際場景中生產過程和消費過程通常都會很耗時,這些耗時的操做最好不要放在同步代碼塊中,這樣會形成別的線程的長時間阻塞。若是把生產過程和消費過程都放在同步代碼塊中,也就是說在一個廚師炒菜的同時不容許別的廚師炒菜,在一個服務員端菜的同時不容許別的程序員端菜,這個顯然是不合理的,你們須要注意這一點。

以上就是wait/notify機制的一個現實應用:生產者-消費者模式的一個簡介。

管道輸入/輸出流

還記得在嘮叨I/O的時候提到的管道流麼,這些管道流就是用於在不一樣線程之間的數據傳輸,一共有四種管道流:

  • PipedInputStream:管道輸入字節流
  • PipedOutputStream:管道輸出字節流
  • PipedReader:管道輸入字符流
  • PipedWriter:管道輸出字符流

字節流和字符流的用法是差很少的,咱們下邊以字節流爲例來嘮叨一下管道流的用法。

一個線程能夠持有一個PipedInputStream對象,這個PipedInputStream對象在內部維護了一個字節數組,默認大小爲1024字節。它並不能單獨使用,須要與另外一個線程持有的一個PipedOutputStream創建關聯,PipedOutputStream往該字節數組中寫數據,PipedInputStream從該字節數組中讀數據,從而實現兩個線程的通訊。

PipedInputStream

先看一下它的幾個構造方法:

圖片描述
它有一個特別重要的方法就是:
圖片描述

PipedOutputStream

看一下它的構造方法:
圖片描述
它也有一個鏈接到管道輸入流的方法:
圖片描述

使用示例

管道流的一般使用場景就是一個線程持有一個PipedInputStream對象,另外一個線程持有一個PipedOutputStream對象,而後把這兩個輸入輸出管道流經過connect方法創建鏈接,此後從管道輸出流寫入的數據就能夠經過管道輸入流讀出,從而實現了兩個線程間的數據交換,也就是實現了線程間的通訊

public class PipedDemo {

    public static void main(String[] args){

        PipedInputStream in = new PipedInputStream();
        PipedOutputStream out = new PipedOutputStream();

        try {
            in.connect(out);    //將輸入流和輸出流創建關聯
        } catch (IOException e) {
            throw new RuntimeException(e);
        }

        new ReadThread(in).start();
        new WriteThread(out).start();
    }
}

class ReadThread extends Thread {

    private PipedInputStream in;

    public ReadThread(PipedInputStream in) {
        this.in = in;
    }

    @Override
    public void run() {

        int i = 0;
        try {
            while ((i=in.read()) != -1) {   //從輸入流讀取數據
                System.out.println(i);
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        } finally {
            try {
                in.close();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

class WriteThread extends Thread {

    private PipedOutputStream out;

    public WriteThread(PipedOutputStream out) {
        this.out = out;
    }

    @Override
    public void run() {
        byte[] bytes = {1, 2, 3, 4, 5};
        try {
            out.write(bytes);   //向輸出流寫入數據
            out.flush();
        } catch (IOException e) {
            throw new RuntimeException(e);
        } finally {
            try {
                out.close();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

執行結果是:

1
2
3
4
5

join方法

咱們前邊說過這個方法,好比有代碼是這樣:

public static void main(String[] args) {

    Thread t = new Thread(new Runnable() {

        @Override
        public void run() {
            // ... 線程t執行的具體任務
        }
    }, "t");

    t.start();

    t.join();
    System.out.println("t線程執行完了,繼續執行main線程");
}

main線程中調用t.join(),表明main線程須要等待t線程執行完成後才能繼續執行。也就是說,這個join方法能夠協調各個線程之間的執行順序。它的實現其實很簡單:

public final synchronized void join() throws InterruptedException {
    while (isAlive()) {
        wait();
    }
}

須要注意的是,join方法Thread類的成員方法。上邊例子中在main線程中調用t.join()的意思就是,使用Thread對象t做爲鎖對象,若是t線程還活着,就調用wait(),把main線程放到與t對象關聯的等待隊列裏,直到t線程執行結束,系統會主動調用一下t.notifyAll(),把與t對象關聯的等待隊列中的線程所有移出,從而main線程能夠繼續執行~

固然它還有兩個指定等待時間的重載方法:
圖片描述

java線程的狀態

java爲了方便的管理線程,對底層的操做系統的線程狀態作了一些抽象封裝,定義了以下的線程狀態:
圖片描述

須要注意的是:

  • 對於在操做系統中線程的運行/就緒狀態,java語言中統一用RUNNABLE狀態來表示。
  • 對於在操做系統中線程的阻塞狀態,java語言中用BLOCKEDWAITINGTIME_WAITING這三個狀態分別表示。
  • 也就是對阻塞狀態進行了進一步細分。對於由於獲取不到鎖而產生的阻塞稱爲BLOCKED狀態,由於調用wait或者join方法而產生的阻塞稱爲WAITING狀態,由於調用有超時時間的waitjoin或者sleep方法而產生的在有限時間內阻塞稱爲TIME_WAITING狀態。

你們能夠經過這個圖來詳細的看一下各個狀態之間的轉換過程:
圖片描述

java這麼劃分線程的狀態純屬於方便本身的管理,好比它會給在WAITINGTIMED_WAITING狀態的線程分別創建不一樣的隊列,來方便實施不一樣的恢復策略~因此你們也不用糾結爲啥和操做系統中定義的不同,其實操做系統中對各個狀態的線程仍然有各類細分來方便管理,若是是你去設計一個語言或者一個操做系統,你也能夠爲了本身的方便來定義一下線程的各類狀態。咱們做爲語言的使用者,首先仍是把這些狀態記住了再說哈🙄~

獲取線程執行狀態

java中定義了一個State枚舉類型,來表示線程的狀態:

public class Thread implements Runnable {
    // ... 爲節省篇幅,省略其它方法和字段

    public enum State { 
        NEW, RUNNABLE, BLOCKED, WAITING, TIMED_WAITING, TERMINATED;
    }
}

而後又在Thread類裏定義了一個成員方法:

public State getState() {
    //省略了具體的實現
}

咱們能夠經過這個getState方法來獲取到對應的線程狀態,下邊來舉個例子獲取上邊列舉的6種狀態,爲了故事的順利發展,咱們先定義一個工具類:

public class LockUtil {

    public static void sleep(long mill) {
        try {
            Thread.sleep(mill);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public static void wait(Object obj) {
        try {
            obj.wait();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

由於每次調用sleepwait操做的時候都會有InterruptedException的異常說明,咱們都須要try...catch一下,會致使代碼結構會很混亂,因此咱們寫了個工具類來把InterruptedException的異常轉爲運行時異常。注意,咱們這裏轉爲運行時異常只是爲了代碼結構清晰,真實狀況須要認真處理InterruptedException的異常說明,具體怎麼使用咱們後邊會詳細嘮叨。

而後接着寫具體的獲取狀態的代碼:

public class ThreadStateDemo {

    private static Object lock = new Object();  //鎖對象

    public static void main(String[] args) {

        Thread t = new Thread(new Runnable() {

            @Override
            public void run() {

                double d = 0.1;
                int i = 0;
                while (i++ < 100000) {  //模仿一個耗時操做
                    d = d*0.3d;
                }

                SleepUtil.sleep(2000L);    //休眠2秒鐘
                synchronized (lock) {
                    LockUtil.wait(lock);
                }
                synchronized (lock) {   //嘗試獲取lock鎖

                }
            }
        }, "t");

        System.out.println("初始狀態:" + t.getState());
        t.start();
        System.out.println("運行一個耗時操做時的狀態:" + t.getState());

        SleepUtil.sleep(1000L);

        System.out.println("休眠時的狀態:" + t.getState());


        SleepUtil.sleep(2000L);

        System.out.println("wait的狀態:" + t.getState());
        synchronized (lock) {
            lock.notifyAll();
        }
        System.out.println("被notify後的狀態:" + t.getState());

        synchronized (lock) {
            SleepUtil.sleep(1000L); //調用sleep方法不會釋放鎖
            System.out.println("由於獲取鎖而阻塞的狀態:" + t.getState());
        }
    }
}

咱們在程序裏用了一系列的sleep方法來控制程序的執行順序,這只是爲了簡單的說明線程的各個狀態的產生緣由,在真實環境中是不容許使用sleep方法來控制線程間的執行順序的,應該使用同步或者咱們上邊介紹的一系列線程通訊的方式。這個程序的執行結果是:

初始狀態:NEW
運行一個耗時操做時的狀態:RUNNABLE
休眠時的狀態:TIMED_WAITING
wait的狀態:WAITING
被notify後的狀態:BLOCKED
由於獲取鎖而阻塞的狀態:TERMINATED
線程的各個狀態都獲取到了哈。

總結

  1. 基於wait/notify機制的生產者-消費者模式很重要,務必認真看幾遍~
  2. 一個線程能夠持有一個PipedInputStream對象,這個PipedInputStream對象在內部維護了一個字節數組,默認大小爲1024字節。它並不能單獨使用,須要與另外一個線程持有的一個PipedOutputStream創建關聯,PipedOutputStream往該字節數組中寫數據,PipedInputStream從該字節數組中讀數據,從而實現兩個線程的通訊。
  3. 使用join方法能夠實現一個線程在另外一個線程執行完畢後才繼續執行的功能。
  4. java爲了方便的管理線程,對底層的操做系統的線程狀態作了一些抽象封裝,定義了NEWRUNNABLEBLOCKEDWAITINGTIME_WAITINGTERMINATED這些線程狀態,與操做系統中的線程有一些區別:
  • 對於在操做系統中線程的運行/就緒狀態,java語言中統一用RUNNABLE狀態來表示。
  • 對於在操做系統中線程的阻塞狀態,java語言中用BLOCKEDWAITINGTIME_WAITING這三個狀態分別表示。

題外話

寫文章挺累的,有時候你以爲閱讀挺流暢的,那實際上是背後無數次修改的結果。若是你以爲不錯請幫忙轉發一下,萬分感謝~

相關文章
相關標籤/搜索