場景html
一個線程從某個地方接收消息(數據),能夠是其餘主機或者消息隊列,而後轉由另外的一個線程池來執行具體處理消息的邏輯,而且消息的處理速度小於接收消息的速度。這種情景很常見,試想一下,你會怎麼設計和實現?java
直觀想法多線程
很顯然採用JUC的線程框架,能夠迅速寫出代碼。架構
消息接收者:框架
- public class Receiver {
- private static volatile boolean inited = false;
- private static volatile boolean shutdown = false;
- private static volatile int cnt = 0;
- private MessageHandler messageHandler;
- public void start(){
- Executors.newSingleThreadExecutor().execute(new Runnable() {
- @Override
- public void run() {
- while(!shutdown){
- init();
- recv();
- }
- }
- });
- }
- /**
- * 模擬消息接收
- */
- public void recv(){
- Message msg = new Message("Msg" + System.currentTimeMillis()); System.out.println(String.format("接收到消息(%d): %s", ++cnt, msg)); messageHandler.handle(msg); } public void init(){ if(!inited){ messageHandler = new MessageHandler(); inited = true; } } public static void main(String[] args) { new Receiver().start();
- }
- }
消息處理:jvm
- public class MessageHandler {
- private static final int THREAD_POOL_SIZE = 4;
- private ExecutorService service = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
- public void handle(Message msg) {
- try {
- service.execute(new Runnable() {
- @Override
- public void run() {
- parseMsg(msg);
- }
- });
- } catch (Throwable e) {
- System.out.println("消息處理異常" + e); } } /** * 比較耗時的消息處理流程 */ public void parseMsg(Message message) { while (true) { try { System.out.println("解析消息:" + message); Thread.sleep(5000); System.out.println("============================"); } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
效果:這種方案致使的現象是接收到的消息會迅速堆積,咱們從消息隊列(或者其餘地方)取出了大量消息,可是處理線程的速度又跟不上,因此致使的問題是大量的Task會堆積在線程池底層維護的一個阻塞隊列中,這會極大的耗費存儲空間,影響系統的性能。ide
分析:當execute()一個任務的時候,若是有空閒的worker線程,那麼投入運行,不然看設置的最大線程個數,沒有達到線程個數限制就建立新線程,接新任務,不然就把任務緩衝到一個阻塞隊列中,問題就是這個隊列,默認的大小是沒有限制的,因此就會大量的堆積任務,必然耗費heap空間。性能
- public static ExecutorService newFixedThreadPool(int nThreads) {
- return new ThreadPoolExecutor(nThreads, nThreads,
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>());
- }
- public LinkedBlockingQueue() {
- this(Integer.MAX_VALUE); // capacity
- }
計數限制學習
面對上述問題,想到了要限制消息接收的速度,天然就想到了各類線程同步的原語,不過在這裏最簡單的就是使用一個Volatile的計數器。this
消息接收者:
- public class Receiver {
- private static volatile boolean inited = false;
- private static volatile boolean shutdown = false;
- private static volatile int cnt = 0;
- private MessageHandler messageHandler;
- public void start(){
- Executors.newSingleThreadExecutor().execute(new Runnable() {
- @Override
- public void run() {
- while(!shutdown){
- init();
- recv();
- }
- }
- });
- }
- /**
- * 模擬消息接收
- */
- public void recv(){
- Message msg = new Message("Msg" + System.currentTimeMillis()); System.out.println(String.format("接收到消息(%d): %s", ++cnt, msg)); messageHandler.handle(msg); } public void init(){ if(!inited){ messageHandler = new MessageHandler(); inited = true; } } public static void main(String[] args) { new Receiver().start();
- }
- }
消息處理:
- public class MessageHandler {
- private static final int THREAD_POOL_SIZE = 1;
- private ExecutorService service = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
- public void handle(Message msg){
- try {
- service.execute(new Runnable() {
- @Override
- public void run() {
- parseMsg(msg);
- }
- });
- } catch (Throwable e) {
- System.out.println("消息處理異常" + e); } } /** * 比較耗時的消息處理流程 */ public void parseMsg(Message message){ try { Thread.sleep(10000); System.out.println("解析消息:" + message); } catch (InterruptedException e) { e.printStackTrace(); }finally {
- Receiver.limit --;
- }
- }
- }
效果:經過控制消息的個數來阻塞消息的接收過程,就不會致使任務的堆積,系統的內存消耗會比較平緩,限制消息的個數本質就和下面限制任務隊列大小同樣。
使用同步隊列 SynchronousQueue
SynchronousQueue 雖名爲隊列,可是其實不會緩衝任務的對象,只是做爲對象傳遞的控制點,若是有空閒線程或者沒有達到最大線程限制,就會交付給worker線程去執行,不然就會拒絕,咱們須要本身實現對應的拒絕策略RejectedExecutionHandler,默認的是拋出異常RejectedExecutionException。
消息接收者同上。
消息處理:
- public class MessageHandler {
- private static final int THREAD_POOL_SIZE = 4;
- ThreadPoolExecutor service = new ThreadPoolExecutor(THREAD_POOL_SIZE, THREAD_POOL_SIZE, 0L, TimeUnit.MILLISECONDS,
- new SynchronousQueue<Runnable>(), new RejectedExecutionHandler() {
- @Override
- public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
- System.out.println("自定義拒絕策略"); try { executor.getQueue().put(r); System.out.println("從新聽任務回隊列"); } catch (InterruptedException e) { e.printStackTrace(); } } }); public void handle(Message msg) { try { System.out.println(service.getTaskCount()); System.out.println(service.getQueue().size()); System.out.println(service.getCompletedTaskCount()); service.execute(new Runnable() { @Override public void run() { parseMsg(msg); } }); } catch (Throwable e) { System.out.println("消息處理異常" + e); } } /** * 比較耗時的消息處理流程 */ public void parseMsg(Message message) { while (true) { try { System.out.println("線程名:" + Thread.currentThread().getName()); System.out.println("解析消息:" + message); Thread.sleep(1000); } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
效果:可以控制消息的接收速度,可是咱們須要在rejectedExecution中實現某種阻塞的操做,可是選擇在發生拒絕的時候把任務從新放回隊列,帶來的問題就是這個Task會發生飢餓現象。
使用大小限制的阻塞隊列
使用LinkedBlockingQueue做爲線程框架底層的任務緩衝區,而且設置大小限制,思想上和上述方案同樣,都是有一個阻塞的點,可是經過最後的jvm monitor看到這裏的CPU消耗更少,內存使用有所下降,而且波動小(具體緣由有待探索)。
消息接收者同上。
消息處理:
- public class MessageHandler {
- private static final int THREAD_POOL_SIZE = 4;
- private static final int BLOCK_QUEUE_CAP = 500;
- ThreadPoolExecutor service = new ThreadPoolExecutor(THREAD_POOL_SIZE, THREAD_POOL_SIZE, 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>(BLOCK_QUEUE_CAP), new SimpleThreadFactory(), new RejectedExecutionHandler() {
- @Override
- public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
- System.out.println("自定義拒絕策略"); try { executor.getQueue().put(r); System.out.println("從新聽任務回隊列"); } catch (InterruptedException e) { e.printStackTrace(); } } }); public void handle(Message msg) { try { service.execute(new Runnable() { @Override public void run() { parseMsg(msg); } }); } catch (Throwable e) { System.out.println("消息處理異常" + e); } } /** * 比較耗時的消息處理流程 */ public void parseMsg(Message message) { try { Thread.sleep(5000); System.out.println("線程名:" + Thread.currentThread().getName()); System.out.println("解析消息:" + message); } catch (InterruptedException e) { e.printStackTrace(); } } static class SimpleThreadFactory implements ThreadFactory { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("Thread-" + System.currentTimeMillis()); return thread;
- }
- }
- }
總結
多線程是比較容易出問題的地方,特別當對方法不熟悉的時候
感興趣的能夠本身來個人Java架構羣,能夠獲取免費的學習資料,羣號:855801563對Java技術,架構技術感興趣的同窗,歡迎加羣,一塊兒學習,相互討論。