《圖解Java多線程設計模式》之六:Producer-Consumer模式

一,Producer-Consumer模式java

Producer:生產者的意思,指的是生成數據的線程。
Consumer:消費者的意思,指的是使用數據的線程
當生產者和消費者以不一樣的線程運行時,二者之間的處理速度差別就會引發問題。好比,消費者想獲取數據,但是數據尚未生成。
或者生產者想要交付數據,而消費者的狀態還沒法接收數據。
Producer-Consumer模式在生產者消費者之間加入了一個橋樑角色。該橋樑角色用於消除線程間處理速度的差別。
在該模式中,生產者和消費者都有多個,當消費者和生產者都只有一個時,稱之爲Pipe模式數組

二,例子安全

/** * Table類用於表示放置蛋糕的桌子. * 存放蛋糕的容器是數組。因此放置蛋糕是有順序的,拿取蛋糕也是有順序的。 */
public class Table { private final String[] buffer; //實際存放蛋糕的容器
    private int tail;//下次put的位置
    private int head;//下次take的位置
    private int count;//buffer中蛋糕的個數

    public Table(int count) { this.buffer = new String[count]; this.tail = 0; this.head = 0; this.count = 0; } //放置蛋糕
    public synchronized void put(String cake)throws InterruptedException{ System.out.println(Thread.currentThread().getName()+" put "+cake); while (count >= buffer.length){ wait(); } buffer[tail] = cake; tail = (tail+1)%buffer.length; count++; notifyAll(); } //拿取蛋糕
    public synchronized String take()throws InterruptedException{ while (count <= 0){ wait(); } String cake = buffer[head]; head = (head+1)%buffer.length; count--; notifyAll(); System.out.println(Thread.currentThread().getName()+" takes "+cake); return cake; } }
public class MakerThread extends Thread { private final Random random; private final Table table; private static int id =0;//蛋糕的流水號

    public MakerThread(String name, Table table, long seed) { super(name); this.random = new Random(seed); this.table = table; } @Override public void run() { try { while (true){ Thread.sleep(random.nextInt(1000)); String cake = "[ Cake NO."+nextId() +" by "+getName()+"]"; table.put(cake); } }catch (InterruptedException e){ } } private static synchronized int nextId(){ return id++; } }
public class EaterThread extends Thread { private final Random random; private final Table table; public EaterThread(String name, Table table, long seed) { super(name); this.random = new Random(seed); this.table = table; } @Override public void run() { try { while (true){ String cake = table.take(); Thread.sleep(random.nextInt(1000)); } }catch (InterruptedException e){ } } }
public class Test { public static void main(String[] args) { Table table = new Table(3); new MakerThread("MakerThread-1",table,31415).start(); new MakerThread("MakerThread-2",table,92653).start(); new MakerThread("MakerThread-3",table,58979).start(); new EaterThread("EaterThread-1",table,32384).start(); new EaterThread("EaterThread-2",table,62643).start(); new EaterThread("EaterThread-3",table,38327).start(); } }

三,InterruptedException異常數據結構

1.加了InterruptedException 的方法
java.lang.Object類的wait方法
java.lang.Thread類的sleep方法
java.lang.Thread類的的join方法
2.加了 throws InterruptedException 的方法可能會花費時間,可是能夠取消
花費時間體如今:
執行wait方法,須要等待notify/notifyAll方法喚醒,須要花費時間
執行sleep方法,會暫停執行,這也花費時間
執行join方法,會等待指定線程終止,須要時間
能夠取消體如今:
假如Alice線程執行了Thread.sleep(100000);咱們能夠在別的線程中使用 alice.interrupt();當執行了interrupt後,正在sleep的線程會終止暫停狀態,alice線程拋出
InterruptedException異常。這樣線程Alice的控制權就會轉移到捕獲該異常的catch語句塊中。
3.interrupt方法只是改變了中斷狀態
上面調用interrupt後,線程拋出InterruptedException異常,只是由於alice線程調用了線程中斷的方法(wait,sleep,join)。
當線程執行普通的邏輯處理時,即便別的線程調用alice.interrupt(); 也不會拋出異常,而是繼續執行。只有當線程繼續執行到sleep,wait,join等方法的調用時,
纔會拋出InterruptedException異常
4,其餘線程執行alice.interrupt()時,並不須要獲取alice線程實例的鎖,不管什麼時候,任何線程均可以調用其餘線程的interrupt方法
5.notify和interrupt的區別:
??????????dom

四,isInterrupted方法:檢查中斷狀態ide

isInterrupted是Thread類的實例方法,用於檢查指定線程的中斷狀態,
若指定線程處於中斷狀態:返回true,
未處於中斷狀態:返回false函數

五,Thread.Interrupted:檢查並清除中斷狀態this

該方法是Thread類的靜態方法,用於檢查並清除當前線程的中斷狀態(操做對象是線程自己)
若當前線程處於中斷狀態,返回true。而且當前線程的中斷狀態會被清楚。
若當前線程未處於中斷狀態,返回false。spa

六,java.util.concurrent包中的隊列線程

1.BlockingQueue接口:阻塞隊列
該接口表示在達到合適的狀態以前線程一直阻塞(wait)的隊列。實現類:
1).ArrayBlockingQueue類:基於數組的 BlockingQueue
元素個數有限的BlockingQueue
2).LinkedBlockingQueue類:基於鏈表的BlockingQueue
元素個數沒有最大限制的,只要有內存,就能夠一直put數據
3).PriorityBlockingQueue類:帶有優先級的BlockingQueue
數據的優先級是根據Comparable接口的天然排序,或構造函數的Comparator接口決定的順序指定的
4).SynchronousQueue類:直接傳遞的BlockingQueue
該類用於執行由Producer角色到Consumer角色的 直接傳遞。若是Producer角色先put,在Consumer角色take以前,Producer角色的線程將一直阻塞。
反之同樣阻塞
2.ConcurrentLinkedQueue類:元素個數沒有最大限制的線程安全隊列
ConcurrentLinkedQueue中,內部數據結構是分開的,線程之間互不影響,因此就不須要進行互斥處理
3.使用ArrayBlockingQueue替代上面實例程序中的Table類
代碼:

/** * 使用阻塞隊列完成table的功能 */
public class BlockQueueTable extends ArrayBlockingQueue<String>{ public BlockQueueTable(int capacity) { super(capacity); } public void put(String cake) throws InterruptedException { System.out.println(Thread.currentThread().getName()+" puts "+cake); super.put(cake); } public String take() throws InterruptedException { String cake = super.take(); System.out.println(Thread.currentThread().getName()+" takes "+cake); return cake; } }

 七,java.util.concurrent.Exchanger類

該類用來交換兩個線程的數據,只有當兩個線程都準備好了,纔會進行交換。否則其中一個線程會一直等待另外一個線程

public class ExchangerTest { public static void main(String[] args) { ExecutorService service = Executors.newCachedThreadPool(); final Exchanger exchanger = new Exchanger(); service.execute(new Runnable() { @Override public void run() { try { String data1 = "aaa"; System.out.println("線程"+Thread.currentThread().getName()+
                    "正在準備把 "+data1+"換出去"); Thread.sleep(new Random().nextInt(3000)); String data2 = (String) exchanger.exchange(data1); System.out.println("線程"+Thread.currentThread().getName()+
                    "換回的數據爲"+data2); }catch (InterruptedException e){ } } }); service.execute(new Runnable() { @Override public void run() { try { String data1 = "bbb"; System.out.println("線程"+Thread.currentThread().getName()+
                            "正在準備把 "+data1+"換出去"); Thread.sleep(new Random().nextInt(3000)); String data2 = (String) exchanger.exchange(data1); System.out.println("線程"+Thread.currentThread().getName()+
                            "換回的數據爲"+data2); }catch (InterruptedException e){ } } }); } }

 運行結果:

線程pool-1-thread-1正在準備把 aaa換出去線程pool-1-thread-2正在準備把 bbb換出去線程pool-1-thread-2換回的數據爲aaa線程pool-1-thread-1換回的數據爲bbb

相關文章
相關標籤/搜索