多線程的難點主要就是多線程通訊協做這一塊了,前面筆記二中提到了常見的同步方法,這裏主要是進行實例學習了,今天總結了一下3個實例:html
一、銀行存款與提款多線程實現,使用Lock鎖和條件Condition。 附加 : 用監視器進行線程間通訊java
二、生產者消費者實現,使用LinkedList自寫緩衝區。linux
三、多線程之阻塞隊列學習,用阻塞隊列快速實現生產者消費者模型。 附加:用布爾變量關閉線程數組
在三種線程同步方法中,咱們這裏的實例用Lock鎖來實現變量同步,由於它比較靈活直觀。緩存
實現了變量的同步,咱們還要讓多個線程之間進行「通話」,就是一個線程完成了某個條件以後,告訴其餘線程我完成了這個條件,大家能夠行動了。下面就是java提供的條件接口Condition定義的同步方法:安全
很方便的是,java的Lock鎖裏面提供了newConditon()方法能夠,該方法返回:一個綁定了lock鎖的Condition實例,有點抽象,其實把它看做一個能夠發信息的鎖就能夠了,看後面的代碼,應該就能理解了。多線程
咱們模擬ATM機器存款與提款,建立一個帳戶類Account(),該類包含同步方法:併發
存款方法:deposit()dom
提款方法:withdraw()ide
以及一個普通的查詢餘額的方法getbalance().
咱們建立兩個任務線程,分別調用兩個同步方法,進行模擬操做,看代碼:
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ThreadCooperation { private static Account account = new Account(); public static void main(String[] args) { //建立線程池 ExecutorService executor = Executors.newFixedThreadPool(2); executor.execute(new DepositTask()); executor.execute(new WithdrawTask()); } //存錢 public static class DepositTask implements Runnable { @Override public void run() { try { while(true) { account.deposit((int)(Math.random()*1000)+1); Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } } } public static class WithdrawTask implements Runnable { @Override public void run() { try{ while(true) { account.withdraw((int)(Math.random()*1000)+1); Thread.sleep(500); } } catch (InterruptedException e) { e.printStackTrace(); } } } public static class Account { //一個鎖是一個Lock接口的實例 它定義了加鎖和釋放鎖的方法 ReentrantLock是爲建立相互排斥的鎖的Lock的具體實現 private static Lock lock = new ReentrantLock(); //建立一個condition,具備發通知功能的鎖,前提是要實現了lock接口 private static Condition newDeposit = lock.newCondition(); private int balance = 0; public int getBalance() { return balance; } public void withdraw(int amount) { lock.lock(); try { while(balance < amount) { System.out.println("\t\t錢不夠,等待存錢"); newDeposit.await(); } balance -= amount; System.out.println("\t\t取出"+amount+"塊錢\t剩餘"+getBalance()); } catch (InterruptedException e) { e.printStackTrace(); }finally{ lock.unlock(); } } public void deposit(int amount) { lock.lock(); try{ balance+=amount; System.out.println("存入"+amount+"塊錢"); newDeposit.signalAll(); //發信息喚醒全部的線程 } finally{ lock.unlock(); } } } }
分析:
一、程序中須要注意的:建立一個condition,具備發通知功能的鎖,前提是要實現了lock接口。
二、while(balance < amount)不能改用if判斷,用if會使得線程不安全,使用if會不會進行循環驗證,而while會,咱們常常看到while(true),可是不會常常看到if(true).
三、調用了await方法後,要記得使用signalAll()或者signal()將線程喚醒,不然線程永久等待。
最後再來分析一下這個類的結構,有3個類,兩個靜態任務類實現了Runnable接口,是線程類,而另一個類則是普通的任務類,包含了線程類所用到的方法。咱們的主類在main方法前面就實例化一個Account類,以供線程類調用該類裏面的同步方法。
這種構造方式是多線程經常使用到的一種構造方式吧。不難發現後面要手寫的生產者消費者模型也是這樣子構造的。這至關因而一個多線程模板。也是咱們學習這個例子最重要的收穫吧。
還有一點,接口Lock與Condition都是在java5以後出現的,在這以前,線程通訊是經過內置的監視器(monitor)實現的。
監視器是一個相互排斥且具備同步能力的對象,任意對象都有可能成爲一個monitor。監視器是經過synchronized關鍵字來對本身加鎖(加鎖解鎖是解決線程同步最基本的思想),使用wait()方法時線程暫停並 等待條件發生,發通知則是經過notify()和notifyAll()方法。大致的模板是這樣子的:
不難看出await()、signal()、signally()是wait()、notify()、notifyAll()的進化形態,因此不建議使用監視器。
這個模型一直很經典,學操做系統的時候還學過,記得linux還用PV操做去實現它,不過這東西是跨學科的。
考慮緩存區buffer的使用者,生產者和消費者,他們都能識別緩衝區是否滿的,且兩種各只能發出一種信號:
生產者:它能發出notEmpty()信號,即緩衝區非空信號,當它看到緩衝區滿的時候,它就調用await等待。
消費者:它能發出notFull()信號,即緩衝區未滿的信號,當它看到緩衝區空的時候,它也調用await等待。
看代碼:
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; //生產者消費者 public class ConsumerProducer { private static Buffer buffer= new Buffer(); public static void main(String[] args) { ExecutorService executor = Executors.newFixedThreadPool(2); executor.execute(new ProducerTask()); executor.execute(new ConsumerTask()); executor.shutdown(); } public static class ProducerTask implements Runnable { @Override public void run() { int i=1; try { while(true) { System.out.println("生產者寫入數據"+i); buffer.write(i++); Thread.sleep((int)(Math.random()*80)); } }catch (InterruptedException e) { e.printStackTrace(); } } } public static class ConsumerTask implements Runnable { public void run() { try { while(true){ System.out.println("\t\t消費讀出數據"+buffer.read()); Thread.sleep((int)(Math.random()*100)); } } catch (InterruptedException e) { e.printStackTrace(); } } } public static class Buffer { private static final int CAPACTIY = 4; //緩衝區容量 private java.util.LinkedList<Integer> queue = new java.util.LinkedList<Integer>(); private static Lock lock = new ReentrantLock(); private static Condition notEmpty = lock.newCondition(); private static Condition notFull = lock.newCondition(); public void write(int value) { lock.lock(); try{ while(queue.size()==CAPACTIY) { System.out.println("緩衝區爆滿"); notFull.await(); } queue.offer(value); notEmpty.signalAll(); //通知全部的緩衝區未空的狀況 }catch(InterruptedException ex){ ex.printStackTrace(); }finally{ lock.unlock(); } } @SuppressWarnings("finally") public int read() { int value = 0; lock.lock(); try{ while(queue.isEmpty()) { System.out.println("\t\t緩衝區是空的,等待緩衝區非空的狀況"); notEmpty.await(); } value = queue.remove(); notFull.signal(); }catch(InterruptedException ex){ ex.printStackTrace(); }finally{ lock.unlock(); return value; } } } }
程序裏面設置的容量是4,但是這裏卻能夠存入最多5個數據,並且更合理的狀況應該是初始緩衝區是空的,後面找了下這個小bug,原來是調用offer()函數應該放在檢測語句以前,若是但願一開始就調用ConsumerTask,在main方法裏面調換二者的順序便可。
java的強大之處是它有着豐富的類庫,咱們學習java在某種程度上就是學習這些類庫。
阻塞隊列是這樣的一種隊列:當試圖向一個滿隊列裏添加元素 或者 從空隊列裏刪除元素時,隊列會讓線程自動阻塞,且當隊列滿時,隊列會繼續存儲元素,供喚醒後的線程使用。這應該說是專門爲消費者生產者模型而設計的一種隊列吧,它實現了Queue接口,主要方法是put()和take()方法。
java支持三個具體的阻塞隊列ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue。都在java.util.concurrent包中。
簡單描述上面三個阻塞隊列:
ArrayBlockingQueue: 該阻塞用數組實現,按照FIFO,即先進先出的原則對數據進行排序,和數組的使用有點類似,它事先須要指定一個容量,不過即使隊列超出這個容量,也是不會報錯滴。
LinkeddBlockingQueue:用鏈表實現,默認隊列大小是Integer.MAX_VALUE,也是按照先進先出的方法對數據排序,性能可能比ArrayBlockingQueue,有待研究。
PriorityBlockingQueue:用優先隊列實現的阻塞隊列,會對元素按照大小進行排序,也能夠建立不受限制的隊列,put方法永不阻塞。
ok,看代碼:
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ConsumerProducerUsingBlockQueue { private static ArrayBlockingQueue<Integer> buffer = new ArrayBlockingQueue<Integer>(2); public static void main(String[] args) { ExecutorService executor = Executors.newFixedThreadPool(2); executor.execute(new Consumer()); executor.execute(new Producer()); try { Thread.sleep(100); executor.shutdownNow(); //暴力關閉,會報錯,不推薦 } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public static class Consumer implements Runnable { @Override public void run() { try{ int i=1; while(true){ System.out.println("生成者寫入:"+i); buffer.put(i++); Thread.sleep((int)(Math.random())*1000); } }catch(InterruptedException ex){ ex.printStackTrace(); } } } public static class Producer implements Runnable { @Override public void run() { try{ while(true) { System.out.println("\t\t消費者取出"+buffer.take()); Thread.sleep((int)(Math.random())*10000); } }catch(InterruptedException ex){ ex.printStackTrace(); } } } }
沒啥大的問題,就是在關閉線程的時候太過暴力了,會報錯,線程裏面的每個函數都彷佛值得研究,以前想經過Interrupt暫停,不過失敗了,就直接使用線程池執行器的shoutdownNow方法來的。後面本身又用了另一種關閉線程的方法,見下面代碼
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; public class A_Control_stop { private static LinkedBlockingQueue<String> buffer = new LinkedBlockingQueue<String>(); public static void main(String[] args) { ExecutorService executor = Executors.newFixedThreadPool(2); executor.execute(new Consumer()); executor.execute(new Producer()); executor.shutdown(); while(!executor.isTerminated()){} System.out.println("全部的的線程都正常結束"); } public static class Consumer implements Runnable { private volatile boolean exit = false; @Override public void run() { try{ int i=0; String[] str ={"as","d","sd","ew","sdfg","esfr"}; while(!exit){ System.out.println("生成者寫入:"+str[i]); buffer.put(str[i++]); Thread.sleep((int)(Math.random())*10); if(5==i) { exit=true; } } }catch(InterruptedException ex){ ex.printStackTrace(); } } } public static class Producer implements Runnable { private volatile boolean exit = false; @Override public void run() { try{ int i=0; while(!exit) { System.out.println("\t\t消費者取出"+buffer.take()); i++; Thread.sleep((int)(Math.random())*10); if(5==i) { exit=true; } } }catch(InterruptedException ex){ ex.printStackTrace(); } } } }
關於阻塞隊列,以爲這篇文章講的不錯,推薦你們看看 聊聊併發----Java中的阻塞隊列
用了幾天,多線程算是學了點皮毛,附註一下:這幾天文章主要是參考了《java程序語言設計進階篇第8版》,老外寫的書講的真心不錯,只不過如今java都已經更新到java8了。在其餘一些網站上看到本身的文章,沒有說明轉載什麼的,估計是直接「被採集」過去了。
本文出自於博客園蘭幽,轉載請說明出處。