本文源碼:GitHub·點這裏 || GitEE·點這裏java
在操做系統中,線程是個獨立的個體,可是在線程執行過程當中,若是處理同一個業務邏輯,可能會產生資源爭搶,致使併發問題,一般使用互斥鎖來控制該邏輯。可是在還有這樣一類場景,任務執行是有順序控制的,例如常見的報表數據生成:git
該場景在相對複雜的系統中很是常見,若是基於多線程來描述該過程,則須要線程之間通訊協做,纔能有條不紊的處理該場景業務。github
如上的業務場景,若是線程A生成數據過程當中,線程B一直在訪問數據容器,判斷該過程的數據是否已經生成,則會形成資源浪費。正常的流程應該如圖,線程A和線程B同時啓動,線程A開始處理數據生成任務,線程B嘗試獲取容器數據,數據還沒過來,線程B則進入等待狀態,當線程A的任務處理完成,則通知線程B去容器中獲取數據,這樣基於線程等待和通知的機制來協做完成任務。segmentfault
等待/通知機制的相關方法是Java中Object層級的基礎方法,任何對象都有該方法:多線程
線程的等待通知機制,就是基於這幾個基礎方法。併發
等待/通知機制,該模式下指線程A在不知足任務執行的狀況下調用對象wait()方法進入等待狀態,線程B修改了線程A的執行條件,並調用對象notify()或者notifyAll()方法,線程A收到通知後從wait狀態返回,進而執行後續操做。兩個線程經過基於對象提供的wait()/notify()/notifyAll()等方法完成等待和通知間交互,提升程序的可伸縮性。ide
經過線程通訊解決上述數據生成和存儲任務的解耦流程。this
public class NotifyThread01 { static Object lock = new Object() ; static volatile List<String> dataList = new ArrayList<>(); public static void main(String[] args) throws Exception { Thread saveThread = new Thread(new SaveData(),"SaveData"); saveThread.start(); TimeUnit.SECONDS.sleep(3); Thread dataThread = new Thread(new AnalyData(),"AnalyData"); dataThread.start(); } // 等待數據生成,保存 static class SaveData implements Runnable { @Override public void run() { synchronized (lock){ while (dataList.size()==0){ try { System.out.println(Thread.currentThread().getName()+"等待..."); lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("SaveData .."+ dataList.get(0)+dataList.get(1)); } } } // 生成數據,通知保存 static class AnalyData implements Runnable { @Override public void run() { synchronized (lock){ dataList.add("hello,"); dataList.add("java"); lock.notify(); System.out.println("AnalyData End..."); } } } }
注意:除了dataList知足寫條件,還要在AnalyData線程執行通知操做。spa
基本概念操作系統
管道流主要用於在不一樣線程間直接傳送數據,一個線程發送數據到輸出管道,另外一個線程從輸入管道中讀取數據,進而實現不一樣線程間的通訊。
實現分類
管道字節流:PipedInputStream和PipedOutputStream;
管道字符流:PipedWriter和PipedReader;
新IO管道流:Pipe.SinkChannel和Pipe.SourceChannel;
public class NotifyThread02 { public static void main(String[] args) throws Exception { PipedInputStream pis = new PipedInputStream(); PipedOutputStream pos = new PipedOutputStream(); // 連接輸入流和輸出流 pos.connect(pis); // 寫數據線程 new Thread(new Runnable() { public void run() { BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); // 將從鍵盤讀取的數據寫入管道流 PrintStream ps = new PrintStream(pos); while (true) { try { System.out.print(Thread.currentThread().getName()); ps.println(br.readLine()); Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); } } } }, "輸入數據線程:").start(); // 讀數據線程 new Thread(new Runnable() { public void run() { BufferedReader br = new BufferedReader(new InputStreamReader(pis)); while (true) { try { System.out.println(Thread.currentThread().getName() + br.readLine()); } catch (IOException e) { e.printStackTrace(); } } } }, "輸出數據線程:").start(); } }
寫線程向管道流寫入數據,讀線程讀取數據,完成基本通訊流程。
基於線程等待通知機制:實現工廠生產一件商品,通知商店賣出一件商品的業務流程。
public class NotifyThread03 { public static void main(String[] args) { Product product = new Product(); ProductFactory productFactory = new ProductFactory(product); ProductShop productShop = new ProductShop(product); productFactory.start(); productShop.start(); } } // 產品 class Product { public String name ; public double price ; // 產品是否生產完畢,默認沒有 boolean flag ; } // 產品工廠:生產 class ProductFactory extends Thread { Product product ; public ProductFactory (Product product){ this.product = product; } @Override public void run() { int i = 0 ; while (i < 20) { synchronized (product) { if (!product.flag){ if (i%2 == 0){ product.name = "鼠標"; product.price = 79.99; } else { product.name = "鍵盤"; product.price = 89.99; } System.out.println("產品:"+product.name+"【價格:"+product.price+"】出廠..."); product.flag = true ; i++; // 通知消費者 product.notifyAll(); } else { try { // 進入等待狀態 product.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } } } // 產品商店:銷售 class ProductShop extends Thread { Product product ; public ProductShop (Product product){ this.product = product ; } @Override public void run() { while (true) { synchronized (product) { if (product.flag == true ){ System.out.println("產品:"+product.name+"【價格"+(product.price*2)+"】賣出..."); product.flag = false ; product.notifyAll(); //喚醒生產者 } else { try { product.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } } }
流程描述:ProductFactory生成一件商品,通知商店售賣,經過flag標識判斷控制是否進入等待狀態,商店賣出商品後,再次通知工廠生產商品。
GitHub·地址 https://github.com/cicadasmile/java-base-parent GitEE·地址 https://gitee.com/cicadasmile/java-base-parent
序號 | 文章標題 |
---|---|
01 | Java併發:線程的建立方式,狀態週期管理 |
02 | Java併發:線程核心機制,基礎概念擴展 |
03 | Java併發:多線程併發訪問,同步控制 |