生產者和消費者問題是線程模型中的經典問題:生產者和消費者在同一時間段內共用同一個存儲空間,生產者往存儲空間中添加產品,消費者從存儲空間中取走產品,當存儲空間爲空時,消費者阻塞,當存儲空間滿時,生產者阻塞。java
生產者消費者.png安全
如今用四種方式來實現生產者消費者模型多線程
這也是最簡單最基礎的實現,緩衝區滿和爲空時都調用wait()方法等待,當生產者生產了一個產品或者消費者消費了一個產品以後會喚醒全部線程。框架
/** * 生產者和消費者,wait()和notify()的實現 * @author ZGJ * @date 2017年6月22日 */ public class Test1 { private static Integer count = 0; private static final Integer FULL = 10; private static String LOCK = "lock"; public static void main(String[] args) { Test1 test1 = new Test1(); new Thread(test1.new Producer()).start(); new Thread(test1.new Consumer()).start(); new Thread(test1.new Producer()).start(); new Thread(test1.new Consumer()).start(); new Thread(test1.new Producer()).start(); new Thread(test1.new Consumer()).start(); new Thread(test1.new Producer()).start(); new Thread(test1.new Consumer()).start(); } class Producer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (Exception e) { e.printStackTrace(); } synchronized (LOCK) { while (count == FULL) { try { LOCK.wait(); } catch (Exception e) { e.printStackTrace(); } } count++; System.out.println(Thread.currentThread().getName() + "生產者生產,目前總共有" + count); LOCK.notifyAll(); } } } } class Consumer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (LOCK) { while (count == 0) { try { LOCK.wait(); } catch (Exception e) { } } count--; System.out.println(Thread.currentThread().getName() + "消費者消費,目前總共有" + count); LOCK.notifyAll(); } } } } }
結果:dom
Thread-0生產者生產,目前總共有1 Thread-4生產者生產,目前總共有2 Thread-3消費者消費,目前總共有1 Thread-1消費者消費,目前總共有0 Thread-2生產者生產,目前總共有1 Thread-6生產者生產,目前總共有2 Thread-7消費者消費,目前總共有1 Thread-5消費者消費,目前總共有0 Thread-0生產者生產,目前總共有1 Thread-4生產者生產,目前總共有2 Thread-3消費者消費,目前總共有1 Thread-6生產者生產,目前總共有2 Thread-1消費者消費,目前總共有1 Thread-7消費者消費,目前總共有0 Thread-2生產者生產,目前總共有1 Thread-5消費者消費,目前總共有0 Thread-0生產者生產,目前總共有1 Thread-4生產者生產,目前總共有2 Thread-3消費者消費,目前總共有1 Thread-7消費者消費,目前總共有0 Thread-6生產者生產,目前總共有1 Thread-2生產者生產,目前總共有2 Thread-1消費者消費,目前總共有1 Thread-5消費者消費,目前總共有0 Thread-0生產者生產,目前總共有1 Thread-4生產者生產,目前總共有2 Thread-3消費者消費,目前總共有1 Thread-1消費者消費,目前總共有0 Thread-6生產者生產,目前總共有1 Thread-7消費者消費,目前總共有0 Thread-2生產者生產,目前總共有1
java.util.concurrent.lock 中的 Lock 框架是鎖定的一個抽象,經過對lock的lock()方法和unlock()方法實現了對鎖的顯示控制,而synchronize()則是對鎖的隱性控制。
可重入鎖,也叫作遞歸鎖,指的是同一線程 外層函數得到鎖以後 ,內層遞歸函數仍然有獲取該鎖的代碼,但不受影響,簡單來講,該鎖維護這一個與獲取鎖相關的計數器,若是擁有鎖的某個線程再次獲得鎖,那麼獲取計數器就加1,函數調用結束計數器就減1,而後鎖須要被釋放兩次才能得到真正釋放。已經獲取鎖的線程進入其餘須要相同鎖的同步代碼塊不會被阻塞。ide
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * 生產者和消費者,ReentrantLock的實現 * * @author ZGJ * @date 2017年6月22日 */ public class Test2 { private static Integer count = 0; private static final Integer FULL = 10; //建立一個鎖對象 private Lock lock = new ReentrantLock(); //建立兩個條件變量,一個爲緩衝區非滿,一個爲緩衝區非空 private final Condition notFull = lock.newCondition(); private final Condition notEmpty = lock.newCondition(); public static void main(String[] args) { Test2 test2 = new Test2(); new Thread(test2.new Producer()).start(); new Thread(test2.new Consumer()).start(); new Thread(test2.new Producer()).start(); new Thread(test2.new Consumer()).start(); new Thread(test2.new Producer()).start(); new Thread(test2.new Consumer()).start(); new Thread(test2.new Producer()).start(); new Thread(test2.new Consumer()).start(); } class Producer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (Exception e) { e.printStackTrace(); } //獲取鎖 lock.lock(); try { while (count == FULL) { try { notFull.await(); } catch (InterruptedException e) { e.printStackTrace(); } } count++; System.out.println(Thread.currentThread().getName() + "生產者生產,目前總共有" + count); //喚醒消費者 notEmpty.signal(); } finally { //釋放鎖 lock.unlock(); } } } } class Consumer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (InterruptedException e1) { e1.printStackTrace(); } lock.lock(); try { while (count == 0) { try { notEmpty.await(); } catch (Exception e) { e.printStackTrace(); } } count--; System.out.println(Thread.currentThread().getName() + "消費者消費,目前總共有" + count); notFull.signal(); } finally { lock.unlock(); } } } } }
BlockingQueue即阻塞隊列,從阻塞這個詞能夠看出,在某些狀況下對阻塞隊列的訪問可能會形成阻塞。
被阻塞的狀況主要有以下兩種:函數
操做 | 拋異常 | 特定值 | 阻塞 | 超時 | |
---|---|---|---|---|---|
插入 | add(o) | offer(o) | put(o) | offer(o, timeout, timeunit) | |
移除 | remove(o) | poll(o) | take(o) | poll(timeout, timeunit) | |
檢查 | element(o) | peek(o) |
這四類方法分別對應的是:
1 . ThrowsException:若是操做不能立刻進行,則拋出異常
2 . SpecialValue:若是操做不能立刻進行,將會返回一個特殊的值,通常是true或者false
3 . Blocks:若是操做不能立刻進行,操做會被阻塞
4 . TimesOut:若是操做不能立刻進行,操做會被阻塞指定的時間,若是指定時間沒執行,則返回一個特殊值,通常是true或者false
下面來看由阻塞隊列實現的生產者消費者模型,這裏咱們使用take()和put()方法,這裏生產者和生產者,消費者和消費者之間不存在同步,因此會出現連續生成和連續消費的現象ui
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; /** * 使用BlockingQueue實現生產者消費者模型 * @author ZGJ * @date 2017年6月29日 */ public class Test3 { private static Integer count = 0; //建立一個阻塞隊列 final BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(10); public static void main(String[] args) { Test3 test3 = new Test3(); new Thread(test3.new Producer()).start(); new Thread(test3.new Consumer()).start(); new Thread(test3.new Producer()).start(); new Thread(test3.new Consumer()).start(); new Thread(test3.new Producer()).start(); new Thread(test3.new Consumer()).start(); new Thread(test3.new Producer()).start(); new Thread(test3.new Consumer()).start(); } class Producer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (Exception e) { e.printStackTrace(); } try { blockingQueue.put(1); count++; System.out.println(Thread.currentThread().getName() + "生產者生產,目前總共有" + count); } catch (InterruptedException e) { e.printStackTrace(); } } } } class Consumer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (InterruptedException e1) { e1.printStackTrace(); } try { blockingQueue.take(); count--; System.out.println(Thread.currentThread().getName() + "消費者消費,目前總共有" + count); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
Semaphore(信號量)是用來控制同時訪問特定資源的線程數量,它經過協調各個線程,以保證合理的使用公共資源,在操做系統中是一個很是重要的問題,能夠用來解決哲學家就餐問題。Java中的Semaphore維護了一個許可集,一開始先設定這個許可集的數量,可使用acquire()方法得到一個許可,當許可不足時會被阻塞,release()添加一個許可。
在下列代碼中,還加入了另一個mutex信號量,維護生產者消費者之間的同步關係,保證生產者和消費者之間的交替進行spa
import java.util.concurrent.Semaphore; /** * 使用semaphore信號量實現 * @author ZGJ * @date 2017年6月29日 */ public class Test4 { private static Integer count = 0; //建立三個信號量 final Semaphore notFull = new Semaphore(10); final Semaphore notEmpty = new Semaphore(0); final Semaphore mutex = new Semaphore(1); public static void main(String[] args) { Test4 test4 = new Test4(); new Thread(test4.new Producer()).start(); new Thread(test4.new Consumer()).start(); new Thread(test4.new Producer()).start(); new Thread(test4.new Consumer()).start(); new Thread(test4.new Producer()).start(); new Thread(test4.new Consumer()).start(); new Thread(test4.new Producer()).start(); new Thread(test4.new Consumer()).start(); } class Producer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } try { notFull.acquire(); mutex.acquire(); count++; System.out.println(Thread.currentThread().getName() + "生產者生產,目前總共有" + count); } catch (InterruptedException e) { e.printStackTrace(); } finally { mutex.release(); notEmpty.release(); } } } } class Consumer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (InterruptedException e1) { e1.printStackTrace(); } try { notEmpty.acquire(); mutex.acquire(); count--; System.out.println(Thread.currentThread().getName() + "消費者消費,目前總共有" + count); } catch (InterruptedException e) { e.printStackTrace(); } finally { mutex.release(); notFull.release(); } } } } }
在java的io包下,PipedOutputStream和PipedInputStream分別是管道輸出流和管道輸入流。
它們的做用是讓多線程能夠經過管道進行線程間的通信。在使用管道通訊時,必須將PipedOutputStream和PipedInputStream配套使用。
使用方法:先建立一個管道輸入流和管道輸出流,而後將輸入流和輸出流進行鏈接,用生產者線程往管道輸出流中寫入數據,消費者在管道輸入流中讀取數據,這樣就能夠實現了不一樣線程間的相互通信,可是這種方式在生產者和生產者、消費者和消費者之間不能保證同步,也就是說在一個生產者和一個消費者的狀況下是能夠生產者和消費者之間交替運行的,多個生成者和多個消費者者之間則不行操作系統
/** * 使用管道實現生產者消費者模型 * @author ZGJ * @date 2017年6月30日 */ public class Test5 { final PipedInputStream pis = new PipedInputStream(); final PipedOutputStream pos = new PipedOutputStream(); { try { pis.connect(pos); } catch (IOException e) { e.printStackTrace(); } } class Producer implements Runnable { @Override public void run() { try { while(true) { Thread.sleep(1000); int num = (int) (Math.random() * 255); System.out.println(Thread.currentThread().getName() + "生產者生產了一個數字,該數字爲: " + num); pos.write(num); pos.flush(); } } catch (Exception e) { e.printStackTrace(); } finally { try { pos.close(); pis.close(); } catch (IOException e) { e.printStackTrace(); } } } } class Consumer implements Runnable { @Override public void run() { try { while(true) { Thread.sleep(1000); int num = pis.read(); System.out.println("消費者消費了一個數字,該數字爲:" + num); } } catch (Exception e) { e.printStackTrace(); } finally { try { pos.close(); pis.close(); } catch (IOException e) { e.printStackTrace(); } } } } public static void main(String[] args) { Test5 test5 = new Test5(); new Thread(test5.new Producer()).start(); new Thread(test5.new Consumer()).start(); } }
做者:Kevin_ZGJ 連接:https://www.jianshu.com/p/66e8b5ab27f6 來源:簡書 簡書著做權歸做者全部,任何形式的轉載都請聯繫做者得到受權並註明出處。