生產者 - 消費者模式在編程領域的應用很是普遍,前面咱們曾經提到,Java 線程池本質上就是用生產者 - 消費者模式實現的,因此每當使用線程池的時候,其實就是在應用生產者 - 消費者模式。數據庫
固然,除了在線程池中的應用,爲了提高性能,併發編程領域不少地方也都用到了生產者 - 消費者模式,例如 Log4j2 中異步 Appender 內部也用到了生產者 - 消費者模式。因此咱們就來深刻地聊聊生產者 - 消費者模式,看看它具體有哪些優勢,以及如何提高系統的性能。編程
生產者 - 消費者模式的核心是一個任務隊列
,生產者線程生產任務,並將任務添加到任務隊列中,而消費者線程從任務隊列中獲取任務並執行。下面是生產者 - 消費者模式的一個示意圖,你能夠結合它來理解。segmentfault
生產者 - 消費者模式示意圖####架構
從架構設計的角度來看,生產者 - 消費者模式有一個很重要的優勢,就是解耦
。解耦對於大型系統的設計很是重要,而解耦的一個關鍵就是組件之間的依賴關係和通訊方式必須受限。在生產者 - 消費者模式中,生產者和消費者沒有任何依賴關係,它們彼此之間的通訊只能經過任務隊列,因此生產者 - 消費者模式是一個不錯的解耦方案
併發
除了架構設計上的優勢以外,生產者 - 消費者模式還有一個重要的優勢就是支持異步,而且可以平衡生產者和消費者的速度差別
。在生產者 - 消費者模式中,生產者線程只須要將任務添加到任務隊列而無需等待任務被消費者線程執行完,也就是說任務的生產和消費是異步的,這是與傳統的方法之間調用的本質區別。異步
異步化處理最簡單的方式就是建立一個新的線程去處理,那中間增長一個任務隊列
」究竟有什麼用呢?主要仍是用於平衡生產者和消費者的速度差別
。咱們假設生產者的速率很慢,而消費者的速率很高,好比是 1:3,若是生產者有 3 個線程,採用建立新的線程的方式,那麼會建立 3 個子線程,而採用生產者 - 消費者模式,消費線程只須要 1 個就能夠了。函數
Java 語言裏,Java 線程和操做系統線程是一一對應的,線程建立得太多,會增長上下文切換的成本,因此 Java 線程不是越多越好,適量便可。性能
在兩階段終止模式:優雅地終止線程中,咱們提到一個監控系統動態採集的案例,其實最終回傳的監控數據仍是要存入數據庫的(以下圖)。但被監控系統每每有不少,若是每一條回傳數據都直接 INSERT 到數據庫,那DB壓力就很是大了。很顯然,更好的方案是批量執行 SQL,那如何實現呢?這就要用到生產者 - 消費者模式了。this
動態採集功能示意圖 #### 操作系統
利用生產者 - 消費者模式實現批量執行 SQL 很是簡單:將原來直接 INSERT 數據到數據庫的線程做爲生產者線程,生產者線程只需將數據添加到任務隊列,而後消費者線程負責將任務從任務隊列中批量取出並批量執行。
在下面的示例代碼中,咱們建立了 5 個消費者線程負責批量執行 SQL,這 5 個消費者線程以while(true){}
循環方式批量地獲取任務並批量地執行。須要注意的是,從任務隊列中獲取批量任務的方法 pollTasks() 中,首先是以阻塞方式獲取任務隊列中的一條任務,然後則是以非阻塞的方式獲取任務;之因此首先採用阻塞方式,是由於若是任務隊列中沒有任務,這樣的方式可以避免無謂的循環。
// 任務隊列 BlockingQueue<Task> bq=new LinkedBlockingQueue<>(2000); // 啓動 5 個消費者線程 // 執行批量任務 void start() { ExecutorService es=xecutors .newFixedThreadPool(5); for (int i=0; i<5; i++) { es.execute(()->{ try { while (true) { // 獲取批量任務 List<Task> ts=pollTasks(); // 執行批量任務 execTasks(ts); } } catch (Exception e) { e.printStackTrace(); } }); } } // 從任務隊列中獲取批量任務 List<Task> pollTasks() throws InterruptedException{ List<Task> ts=new LinkedList<>(); // 阻塞式獲取一條任務 Task t = bq.take(); while (t != null) { ts.add(t); // 非阻塞式獲取一條任務 t = bq.poll(); } return ts; } // 批量執行任務 execTasks(List<Task> ts) { // 省略具體代碼無數 }
利用生產者 - 消費者模式還能夠輕鬆地支持一種分階段提交的應用場景。咱們知道寫文件若是同步刷盤性能會很慢,因此對於不是很重要的數據,咱們每每採用異步刷盤的方式。我曾經參與過一個項目,其中的日誌組件是本身實現的,採用的就是異步刷盤方式,刷盤的時機是:
這個日誌組件的異步刷盤操做本質上其實就是一種分階段提交
。下面咱們具體看看用生產者 - 消費者模式如何實現。在下面的示例代碼中,能夠經過調用 info()和error()
方法寫入日誌,這兩個方法都是建立了一個日誌任務 LogMsg,並添加到阻塞隊列中,他們兩個方法的線程是生產者;而真正將日誌寫入文件的是消費者線程,在 Logger 這個類中,咱們只建立了 1 個消費者線程,在這個消費者線程中,會根據刷盤規則執行刷盤操做。
class Logger { // 任務隊列 final BlockingQueue<LogMsg> bq = new BlockingQueue<>(); //flush 批量 static final int batchSize=500; // 只須要一個線程寫日誌 ExecutorService es = Executors.newFixedThreadPool(1); // 啓動寫日誌線程 void start(){ File file=File.createTempFile( "foo", ".log"); final FileWriter writer= new FileWriter(file); this.es.execute(()->{ try { // 未刷盤日誌數量 int curIdx = 0; long preFT=System.currentTimeMillis(); while (true) { LogMsg log = bq.poll( 5, TimeUnit.SECONDS); // 寫日誌 if (log != null) { writer.write(log.toString()); ++curIdx; } // 若是不存在未刷盤數據,則無需刷盤 if (curIdx <= 0) { continue; } // 根據規則刷盤 if (log!=null && log.level==LEVEL.ERROR || curIdx == batchSize || System.currentTimeMillis()-preFT>5000){ writer.flush(); curIdx = 0; preFT=System.currentTimeMillis(); } } }catch(Exception e){ e.printStackTrace(); } finally { try { writer.flush(); writer.close(); }catch(IOException e){ e.printStackTrace(); } } }); } // 寫 INFO 級別日誌 void info(String msg) { bq.put(new LogMsg( LEVEL.INFO, msg)); } // 寫 ERROR 級別日誌 void error(String msg) { bq.put(new LogMsg( LEVEL.ERROR, msg)); } } // 日誌級別 enum LEVEL { INFO, ERROR } class LogMsg { LEVEL level; String msg; // 省略構造函數實現 LogMsg(LEVEL lvl, String msg){} // 省略 toString() 實現 String toString(){} }
Java 語言提供的線程池自己就是一種生產者 - 消費者模式的實現,可是線程池中的線程每次只能從任務隊列中消費一個任務來執行,對於大部分併發場景這種策略都沒有問題。可是有些場景仍是須要本身來實現,例如須要批量執行以及分階段提交的場景。