生產者消費者問題是研究多線程程序時繞不開的經典問題之一,它描述是有一塊緩衝區做爲倉庫,生產者能夠將產品放入倉庫,消費者則能夠從倉庫中取走產品。解決生產者/消費者問題的方法可分爲兩類:(1)採用某種機制保護生產者和消費者之間的同步;(2)在生產者和消費者之間創建一個管道。第一種方式有較高的效率,而且易於實現,代碼的可控制性較好,屬於經常使用的模式。第二種管道緩衝區不易控制,被傳輸數據對象不易於封裝等,實用性不強。java
同步問題核心在於:如何保證同一資源被多個線程併發訪問時的完整性。經常使用的同步方法是採用信號或加鎖機制,保證資源在任意時刻至多被一個線程訪問。Java語言在多線程編程上實現了徹底對象化,提供了對同步機制的良好支持。在Java中一共有五種方法支持同步,其中前四個是同步方法,一個是管道方法。算法
wait() / nofity()方法是基類Object的兩個方法,也就意味着全部Java類都會擁有這兩個方法,這樣,咱們就能夠爲任何對象實現同步機制。數據庫
wait()方法:當緩衝區已滿/空時,生產者/消費者線程中止本身的執行,放棄鎖,使本身處於等等狀態,讓其餘線程執行。編程
notify()方法:當生產者/消費者向緩衝區放入/取出一個產品時,向其餘等待的線程發出可執行的通知,同時放棄鎖,使本身處於等待狀態。數據結構
各起了4個生產者,4個消費者 多線程
package test; /* * 生產者消費者問題(ReentrantLock) * 庫存爲空時,生產者生產,消費者等待 * 庫存盡是,消費者消費,生產者等待 */ public class Hosee { private static Integer count = 0; private final Integer FULL = 10; private static String LOCK = "LOCK"; class Producer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (Exception e) { e.printStackTrace(); } synchronized (LOCK) { while (count == FULL) { try { LOCK.wait(); } catch (Exception e) { e.printStackTrace(); } } count++; System.out.println(Thread.currentThread().getName() + "生產者生產,目前總共有" + count); //每生產一個喚醒消費者消費 LOCK.notifyAll(); } } } } class Consumer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (InterruptedException e1) { e1.printStackTrace(); } synchronized (LOCK) { while (count == 0) { try { LOCK.wait(); } catch (Exception e) { // TODO: handle exception e.printStackTrace(); } } count--; System.out.println(Thread.currentThread().getName() + "消費者消費,目前總共有" + count); //沒消費一個喚醒生產者生產 LOCK.notifyAll(); } } } } public static void main(String[] args) throws Exception { Hosee hosee = new Hosee(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); } }
(須要注意的是,用什麼加鎖就用什麼notify和wait,實例中使用的是LOCK)併發
部分打印結果:框架
因爲生產者和消費者說明一致,因此最多都是在2左右,當減小一個消費者時,則會加到10。dom
首先,咱們先來看看await()/signal()與wait()/notify()的區別:ide
那麼爲何有了synchronized還要提出Lock呢?
synchronized並不完美,它有一些功能性的限制 —— 它沒法中斷一個正在等候得到鎖的線程,也沒法經過投票獲得鎖,若是不想等下去,也就無法獲得鎖。同步還要求鎖的釋放只能在與得到鎖所在的堆棧幀相同的堆棧幀中進行,多數狀況下,這沒問題(並且與異常處理交互得很好),可是,確實存在一些非塊結構的鎖定更合適的狀況。
java.util.concurrent.lock 中的 Lock 框架是鎖定的一個抽象,它容許把鎖定的實現做爲 Java 類,而不是做爲語言的特性來實現(更加面向對象)。這就爲 Lock 的多種實現留下了空間,各類實現可能有不一樣的調度算法、性能特性或者鎖定語義。 ReentrantLock 類實現了 Lock ,它擁有與 synchronized 相同的併發性和內存語義,可是添加了相似鎖投票、定時鎖等候和可中斷鎖等候的一些特性。此外,它還提供了在激烈爭用狀況下更佳的性能。(換句話說,當許多線程都想訪問共享資源時,JVM 能夠花更少的時候來調度線程,把更多時間用在執行線程上。)
reentrant 鎖意味着什麼呢?簡單來講,它有一個與鎖相關的獲取計數器,若是擁有鎖的某個線程再次獲得鎖,那麼獲取計數器就加1,而後鎖須要被釋放兩次才能得到真正釋放(重入鎖)。這模仿了 synchronized 的語義;若是線程進入由線程已經擁有的監控器保護的 synchronized 塊,就容許線程繼續進行,當線程退出第二個(或者後續) synchronized 塊的時候,不釋放鎖,只有線程退出它進入的監控器保護的第一個synchronized 塊時,才釋放鎖。
簡單解釋下重入鎖:
public class Child extends Father implements Runnable{ final static Child child = new Child();//爲了保證鎖惟一 public static void main(String[] args) { for (int i = 0; i < 50; i++) { new Thread(child).start(); } } public synchronized void doSomething() { System.out.println("1child.doSomething()"); doAnotherThing(); // 調用本身類中其餘的synchronized方法 } private synchronized void doAnotherThing() { super.doSomething(); // 調用父類的synchronized方法 System.out.println("3child.doAnotherThing()"); } @Override public void run() { child.doSomething(); } } class Father { public synchronized void doSomething() { System.out.println("2father.doSomething()"); } }
上述代碼的鎖都是child對象,當執行child.doSomething時,該線程得到child對象的鎖,在doSomething方法內執行doAnotherThing時再次請求child對象的鎖,由於synchronized是重入鎖,因此能夠獲得該鎖,繼續在doAnotherThing裏執行父類的doSomething方法時第三次請求child對象的鎖,同理可獲得,若是不是重入鎖的話,那這後面這兩次請求鎖將會被一直阻塞,從而致使死鎖。
在查看下面代碼示例時,能夠看到 Lock 和 synchronized 有一點明顯的區別 —— lock 必須在 finally 塊中釋放。不然,若是受保護的代碼將拋出異常,鎖就有可能永遠得不到釋放!這一點區別看起來可能沒什麼,可是實際上,它極爲重要。忘記在 finally 塊中釋放鎖,可能會在程序中留下一個定時炸彈,當有一天炸彈爆炸時,您要花費很大力氣纔有找到源頭在哪。而使用同步,JVM 將確保鎖會得到自動釋放。
Lock lock = new ReentrantLock(); lock.lock(); try { // update object state } finally { lock.unlock(); }
除此以外,與目前的 synchronized 實現相比,爭用下的 ReentrantLock 實現更具可伸縮性。(在將來的 JVM 版本中,synchronized 的爭用性能頗有可能會得到提升。)這意味着當許多線程都在爭用同一個鎖時,使用 ReentrantLock 的整體開支一般要比 synchronized 少得多。
在 Java1.5 中,synchronized 是性能低效的。由於這是一個重量級操做,須要調用操做接口,致使有可能加鎖消耗的系統時間比加鎖之外的操做還多。相比之下使用 Java 提供的 Lock 對象,性能更高一些。可是到了 Java1.6,發生了變化。synchronized 在語義上很清晰,能夠進行不少優化,有適應自旋,鎖消除,鎖粗化,輕量級鎖,偏向鎖等等。致使在 Java1.6 上 synchronized 的性能並不比 Lock 差。官方也表示,他們也更支持 synchronized,在將來的版本中還有優化餘地。
因此在確實須要一些 synchronized 所沒有的特性的時候,好比時間鎖等候、可中斷鎖等候、無塊結構鎖、多個條件變量或者鎖投票使用ReentrantLock。ReentrantLock 還具備可伸縮性的好處,應當在高度爭用的狀況下使用它,可是請記住,大多數 synchronized 塊幾乎歷來沒有出現過爭用,因此能夠把高度爭用放在一邊。我建議用 synchronized 開發,直到確實證實 synchronized 不合適,而不要僅僅是假設若是使用 ReentrantLock 「性能會更好」。請記住,這些是供高級用戶使用的高級工具。(並且,真正的高級用戶喜歡選擇可以找到的最簡單工具,直到他們認爲簡單的工具不適用爲止。)。一如既往,首先要把事情作好,而後再考慮是否是有必要作得更快。
package test; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class Hosee { private static Integer count = 0; private final Integer FULL = 10; final Lock lock = new ReentrantLock(); final Condition NotFull = lock.newCondition(); final Condition NotEmpty = lock.newCondition(); class Producer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (Exception e) { e.printStackTrace(); } lock.lock(); try { while (count == FULL) { try { NotFull.await(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } count++; System.out.println(Thread.currentThread().getName() + "生產者生產,目前總共有" + count); NotEmpty.signal(); } finally { lock.unlock(); } } } } class Consumer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (InterruptedException e1) { e1.printStackTrace(); } lock.lock(); try { while (count == 0) { try { NotEmpty.await(); } catch (Exception e) { // TODO: handle exception e.printStackTrace(); } } count--; System.out.println(Thread.currentThread().getName() + "消費者消費,目前總共有" + count); NotFull.signal(); } finally { lock.unlock(); } } } } public static void main(String[] args) throws Exception { Hosee hosee = new Hosee(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); } }
運行結果與第一個相似。上述代碼用了兩個Condition,其實用一個也是能夠的,只不過要signalall()。
put()方法:相似於咱們上面的生產者線程,容量達到最大時,自動阻塞。
take()方法:相似於咱們上面的消費者線程,容量爲0時,自動阻塞。
package test; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class Hosee { private static Integer count = 0; final BlockingQueue<Integer> bq = new ArrayBlockingQueue<Integer>(10); class Producer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (Exception e) { e.printStackTrace(); } try { bq.put(1); count++; System.out.println(Thread.currentThread().getName() + "生產者生產,目前總共有" + count); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } class Consumer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (InterruptedException e1) { e1.printStackTrace(); } try { bq.take(); count--; System.out.println(Thread.currentThread().getName() + "消費者消費,目前總共有" + count); } catch (Exception e) { // TODO: handle exception e.printStackTrace(); } } } } public static void main(String[] args) throws Exception { Hosee hosee = new Hosee(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); } }
其實這個BlockingQueue比較難用代碼來演示,由於put()與take()方法沒法與輸出語句保證同步,固然你能夠本身去實現 BlockingQueue(BlockingQueue是用await()/signal() 實現的)。因此在輸出結果上你會發現不匹配。
例如:當緩衝區已滿,生產者在put()操做時,put()內部調用了await()方法,放棄了線程的執行,而後消費者線程執行,調用take()方法,take()內部調用了signal()方法,通知生產者線程能夠執行,導致在消費者的println()還沒運行的狀況下生產者的println()先被執行,因此有了輸出不匹配的狀況。
對於BlockingQueue你們能夠放心使用,這可不是它的問題,只是在它和別的對象之間的同步有問題。
Semaphore 信號量,就是一個容許實現設置好的令牌。也許有1個,也許有10個或更多。
誰拿到令牌(acquire)就能夠去執行了,若是沒有令牌則須要等待。
執行完畢,必定要歸還(release)令牌,不然令牌會被很快用光,別的線程就沒法得到令牌而執行下去了。
package test; import java.util.concurrent.Semaphore; public class Hosee { int count = 0; final Semaphore notFull = new Semaphore(10); final Semaphore notEmpty = new Semaphore(0); final Semaphore mutex = new Semaphore(1); class Producer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (Exception e) { e.printStackTrace(); } try { notFull.acquire();//順序不能顛倒,不然會形成死鎖。 mutex.acquire(); count++; System.out.println(Thread.currentThread().getName() + "生產者生產,目前總共有" + count); } catch (Exception e) { e.printStackTrace(); } finally { mutex.release(); notEmpty.release(); } } } } class Consumer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (InterruptedException e1) { e1.printStackTrace(); } try { notEmpty.acquire();//順序不能顛倒,不然會形成死鎖。 mutex.acquire(); count--; System.out.println(Thread.currentThread().getName() + "消費者消費,目前總共有" + count); } catch (Exception e) { e.printStackTrace(); } finally { mutex.release(); notFull.release(); } } } } public static void main(String[] args) throws Exception { Hosee hosee = new Hosee(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); } }
注意notFull.acquire()與mutex.acquire()的位置不能互換,若是先獲得互斥鎖再發生等待,會形成死鎖。
package test; import java.io.IOException; import java.io.PipedInputStream; import java.io.PipedOutputStream; public class Hosee { final PipedInputStream pis = new PipedInputStream(); final PipedOutputStream pos = new PipedOutputStream(); { try { pis.connect(pos); } catch (IOException e) { e.printStackTrace(); } } class Producer implements Runnable { @Override public void run() { try{ while(true){ int b = (int) (Math.random() * 255); System.out.println("Producer: a byte, the value is " + b); pos.write(b); pos.flush(); } }catch(Exception e){ e.printStackTrace(); }finally{ try{ pos.close(); pis.close(); }catch(IOException e){ System.out.println(e); } } } } class Consumer implements Runnable { @Override public void run() { try{ while(true){ int b = pis.read(); System.out.println("Consumer: a byte, the value is " + String.valueOf(b)); } }catch(Exception e){ e.printStackTrace(); }finally{ try{ pos.close(); pis.close(); }catch(IOException e){ System.out.println(e); } } } } public static void main(String[] args) throws Exception { Hosee hosee = new Hosee(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); } }
與阻塞隊列同樣,因爲read()/write()方法與輸出方法不必定同步,輸出結果方面會發生不匹配現象,爲了使結果更加明顯,這裏只有1個消費者和1個生產者。
讀者—寫者問題(Readers-Writers problem)也是一個經典的併發程序設計問題,是常常出現的一種同步問題。計算機系統中的數據(文件、記錄)常被多個進程共享,但其中某些進程可能只要求讀數據(稱爲讀者Reader);另外一些進程則要求修改數據(稱爲寫者Writer)。就共享數據而言,Reader和Writer是兩組併發進程共享一組數據區,要求:
(1)容許多個讀者同時執行讀操做;
(2)不容許讀者、寫者同時操做;
(3)不容許多個寫者同時操做。
Reader和Writer的同步問題分爲讀者優先、弱寫者優先(公平競爭)和強寫者優先三種狀況,它們的處理方式不一樣。
首先咱們都只考慮公平競爭的狀況下,看看Java有哪些方法能夠實現讀者寫者問題
ReentrantReadWriteLock會使用兩把鎖來解決問題,一個讀鎖,一個寫鎖
線程進入讀鎖的前提條件:
沒有其餘線程的寫鎖,
沒有寫請求或者有寫請求,但調用線程和持有鎖的線程是同一個
線程進入寫鎖的前提條件:
沒有其餘線程的讀鎖
沒有其餘線程的寫鎖
到ReentrantReadWriteLock,首先要作的是與ReentrantLock劃清界限。它和後者都是單獨的實現,彼此之間沒有繼承或實現的關係。而後就是總結這個鎖機制的特性了:
看下ReentrantReadWriteLock這個類的兩個構造函數
public ReentrantReadWriteLock() { this(false); } /** * Creates a new {@code ReentrantReadWriteLock} with * the given fairness policy. * * @param fair {@code true} if this lock should use a fair ordering policy */ public ReentrantReadWriteLock(boolean fair) { sync = (fair)? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); }
fair這個參數表示是不是建立一個公平的讀寫鎖,仍是非公平的讀寫鎖。也就是搶佔式仍是非搶佔式。
公平和非公平:公平表示獲取的鎖的順序是按照線程加鎖的順序來分配獲取到鎖的線程時最早加鎖的線程,是按照FIFO的順序來分配鎖的;非公平表示獲取鎖的順序是無需的,後來加鎖的線程可能先得到鎖,這種狀況就致使某些線程可能一直沒獲取到鎖。
公平鎖爲啥會影響性能,從code上來看看公平鎖僅僅是多了一項檢查是否在隊首會影響性能,如不是,那麼又是在什麼地方影響的?假如是闖入的線程,會排在隊尾並睡覺(parking)等待前任節點喚醒,這樣勢必會比非公平鎖添加不少paking和unparking的操做
通常的應用場景是: 若是有多個讀線程,一個寫線程,並且寫線程在操做的時候須要阻塞讀線程,那麼此時就須要使用公平鎖,要否則可能寫線程一直獲取不到鎖,致使線程餓死。
再簡單說下鎖降級
重入還容許從寫入鎖降級爲讀取鎖,其實現方式是:先獲取寫入鎖,而後獲取讀取鎖,最後釋放寫入鎖。可是,從讀取鎖升級到寫入鎖是不可能的。
rwl.readLock().lock(); if (!cacheValid) { // Must release read lock before acquiring write lock rwl.readLock().unlock(); rwl.writeLock().lock(); if (!cacheValid) { data = ... cacheValid = true; } rwl.readLock().lock(); rwl.writeLock().unlock(); // 降級:先獲取讀鎖再釋放寫鎖 }
下面咱們用讀寫鎖來實現讀者寫者問題
import java.util.Random; import java.util.concurrent.locks.ReentrantReadWriteLock; public class ReadWriteLockTest { public static void main(String[] args) { final Queue3 q3 = new Queue3(); for (int i = 0; i < 3; i++) { new Thread() { public void run() { while (true) { q3.get(); } } }.start(); } for (int i = 0; i < 3; i++) { new Thread() { public void run() { while (true) { q3.put(new Random().nextInt(10000)); } } }.start(); } } } class Queue3 { private Object data = null;// 共享數據,只能有一個線程能寫該數據,但能夠有多個線程同時讀該數據。 private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); public void get() { rwl.readLock().lock();// 上讀鎖,其餘線程只能讀不能寫 System.out.println(Thread.currentThread().getName() + " be ready to read data!"); try { Thread.sleep((long) (Math.random() * 1000)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "have read data :" + data); rwl.readLock().unlock(); // 釋放讀鎖,最好放在finnaly裏面 } public void put(Object data) { rwl.writeLock().lock();// 上寫鎖,不容許其餘線程讀也不容許寫 System.out.println(Thread.currentThread().getName() + " be ready to write data!"); try { Thread.sleep((long) (Math.random() * 1000)); } catch (InterruptedException e) { e.printStackTrace(); } this.data = data; System.out.println(Thread.currentThread().getName() + " have write data: " + data); rwl.writeLock().unlock();// 釋放寫鎖 } }
運行結果:
Thread-0 be ready to read data! Thread-1 be ready to read data! Thread-2 be ready to read data! Thread-0have read data :null Thread-2have read data :null Thread-1have read data :null Thread-5 be ready to write data! Thread-5 have write data: 6934 Thread-5 be ready to write data! Thread-5 have write data: 8987 Thread-5 be ready to write data! Thread-5 have write data: 8496
在1.4中已經介紹了用信號量來實現生產者消費者問題,如今咱們將用信號量來實現讀者寫者問題,信號量的相關知識再也不重複,直接看代碼
package test; import java.util.Random; import java.util.concurrent.Semaphore; public class ReadWrite { public static void main(String[] args) { final Queue3 q3 = new Queue3(); for (int i = 0; i < 3; i++) { new Thread() { public void run() { while (true) { try { Thread.sleep((long) (Math.random() * 1000)); } catch (InterruptedException e) { e.printStackTrace(); } q3.get(); } } }.start(); } for (int i = 0; i < 3; i++) { new Thread() { public void run() { while (true) { try { Thread.sleep((long) (Math.random() * 1000)); } catch (InterruptedException e) { e.printStackTrace(); } q3.put(new Random().nextInt(10000)); } } }.start(); } } } class Queue3 { private Object data = null;// 共享數據,只能有一個線程能寫該數據,但能夠有多個線程同時讀該數據。 private Semaphore wmutex = new Semaphore(1); private Semaphore rmutex = new Semaphore(2); private int count = 0; public void get() { try { rmutex.acquire(); if (count == 0) wmutex.acquire();// 當第一讀進程欲讀數據庫時,阻止寫進程寫 count++; System.out.println(Thread.currentThread().getName() + " be ready to read data!"); try { Thread.sleep((long) (Math.random() * 1000)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "have read data :" + data); count--; if (count == 0) wmutex.release(); rmutex.release(); } catch (Exception e) { e.printStackTrace(); } } public void put(Object data) { try { wmutex.acquire(); System.out.println(Thread.currentThread().getName() + " be ready to write data!"); try { Thread.sleep((long) (Math.random() * 1000)); } catch (InterruptedException e) { e.printStackTrace(); } this.data = data; System.out.println(Thread.currentThread().getName() + " have write data: " + data); } catch (Exception e) { e.printStackTrace(); } finally { wmutex.release(); } } }
單純使用信號量不能解決讀者與寫者問題,必須引入計數器count(能夠用CountDownLatch代替 )對讀進程計數;count與wmutex結合使用,使讀讀能同時進行,讀寫排斥。count爲0時表示讀進程開始,此時寫進程阻塞(wmutex被讀進程獲取),當count不爲0時,表示有多個讀進程,就不用操做 wmutex了,由於第一個讀進程已經得到了wmutex。count表示有多少個讀進程在讀,每次有一個就+1,讀完了-1,當count==0時,表示讀進程都結束了。此時wmutex釋放,寫進程纔有機會得到wmutex。爲了使讀進程不要一直佔有 wmutex,最好讓讀進程sleep一下,讓寫進程有機會得到wmutex,使效果更明顯。
就此用Java實現生產者消費者問題(5種)和讀者寫者問題(2種)已經闡述完了,歡迎你們討論以及給出不一樣的解決方案。