模仿Tomcat的BIO模型,來一個消息,分配一個線程處理.java
則主線程池代碼以下異步
package com.guanjian; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * Created by Administrator on 2018/7/10. */ public class ThreadPool { private ExecutorService service; private List<MessageTask> tasks; private int fixedThreadNum = 0; private List<String> messages; private MessageHandler messageHandler; public ThreadPool(int fixedThreadNum,List<String> messages,MessageHandler messageHandler) { this.fixedThreadNum = fixedThreadNum; this.messages = messages; this.messageHandler = messageHandler; service = Executors.newFixedThreadPool(fixedThreadNum); Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { shutdownGracefully(service); } }); } public void shutdownGracefully(ExecutorService ThreadPool) { ShutdownPool.shutdownThreadPool(ThreadPool, "main-pool"); } public void startup() { tasks = new ArrayList<>(); MessageTask messageTask = (fixedThreadNum == 0 ? new SequentialMessageTask(messageHandler,messages) : new ConcurrentMessageTask(messageHandler,messages)); for (String message:messages) { tasks.add(messageTask); service.execute(messageTask); } } }
它是經過線程數fixedThreadNum來區分使用哪一種線程模型.async
package com.guanjian; /** * Created by Administrator on 2018/7/10. */ public interface MessageHandler { public void execute(String message); }
package com.guanjian; /** * Created by Administrator on 2018/7/10. */ public class MessageHandlerImpl implements MessageHandler { @Override public void execute(String message) { System.out.println(message); } }
以上是消息處理器的接口和實現類ide
package com.guanjian; import java.util.List; /** * Created by Administrator on 2018/7/10. */ public abstract class MessageTask implements Runnable { protected MessageHandler messageHandler; protected List<String> messages; MessageTask(MessageHandler messageHandler,List<String> messages) { this.messageHandler = messageHandler; this.messages = messages; } @Override public void run() { for (String message:messages) { handlerMessage(message); } } protected abstract void handlerMessage(String message); }
消息任務抽象類實現了Runnable線程接口,以不一樣的子類來實現BIO,NIO線程模型,具體在抽象方法handlerMessage中實現.性能
package com.guanjian; import java.util.List; /** * Created by Administrator on 2018/7/10. */ public class SequentialMessageTask extends MessageTask { SequentialMessageTask(MessageHandler messageHandler, List<String> messages) { super(messageHandler, messages); } @Override protected void handlerMessage(String message) { messageHandler.execute(message); } }
BIO線程模型子類,經過主線程池來分配線程處理.this
package com.guanjian; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * Created by Administrator on 2018/7/10. */ public class ConcurrentMessageTask extends MessageTask { private ExecutorService asyncService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2); ConcurrentMessageTask(MessageHandler messageHandler, List<String> messages) { super(messageHandler, messages); } @Override protected void handlerMessage(String message) { asyncService.submit(new Runnable() { @Override public void run() { messageHandler.execute(message); } }); } protected void shutdown() { ShutdownPool.shutdownThreadPool(asyncService,"async-pool-" + Thread.currentThread().getId()); } }
NIO線程模型,再也不使用主線程池來分配線程,而是異步線程池,類比於Netty中的Worker線程池,從BOSS線程池中接管消息處理.線程
package com.guanjian; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; /** * Created by Administrator on 2018/7/10. */ public class ShutdownPool { private static Logger log = LoggerFactory.getLogger(ThreadPool.class); /** * 優雅關閉線程池 * @param threadPool * @param alias */ public static void shutdownThreadPool(ExecutorService threadPool, String alias) { log.info("Start to shutdown the thead pool: {}", alias); threadPool.shutdown(); // 使新任務沒法提交. try { // 等待未完成任務結束 if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) { threadPool.shutdownNow(); // 取消當前執行的任務 log.warn("Interrupt the worker, which may cause some task inconsistent. Please check the biz logs."); // 等待任務取消的響應 if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) log.error("Thread pool can't be shutdown even with interrupting worker threads, which may cause some task inconsistent. Please check the biz logs."); } } catch (InterruptedException ie) { // 從新取消當前線程進行中斷 threadPool.shutdownNow(); log.error("The current server thread is interrupted when it is trying to stop the worker threads. This may leave an inconcistent state. Please check the biz logs."); // 保留中斷狀態 Thread.currentThread().interrupt(); } log.info("Finally shutdown the thead pool: {}", alias); } }
最後是線程池的優雅關閉,不管是主線程池仍是異步線程池皆調用該方法實現優雅關閉.server
以上只是模型代碼,具體可替換成具體須要的業務代碼來達到業務性能的提高.接口