在io.moquette.spi.impl.BrokerInterceptor的構造函數中,新建了一個線程池,代碼以下:segmentfault
private BrokerInterceptor(int poolSize, List<InterceptHandler> handlers) { LOG.info("Initializing broker interceptor. InterceptorIds={}", getInterceptorIds(handlers)); this.handlers = new HashMap<>(); for (Class<?> messageType : InterceptHandler.ALL_MESSAGE_TYPES) { this.handlers.put(messageType, new CopyOnWriteArrayList<InterceptHandler>()); } for (InterceptHandler handler : handlers) { this.addInterceptHandler(handler); } executor = Executors.newFixedThreadPool(poolSize); }
executor = Executors.newFixedThreadPool(poolSize);這句代碼雖然建立了一個固定線程數量的線程池,可是線程池的任務隊列並無作限制,一旦某個InterceptHandler中的某個方法進行了耗時處理,在高併發的狀況下,會很容易致使線程池的隊列堆積大量待處理的任務,進而可能形成內存溢出。併發
分別添加如下類和接口ide
public class ThreadPoolHelper { public static ExecutorService createFixedExecutor(int threadNum,int capacity,String threadFactoryName) { return new ThreadPoolExecutor( threadNum, threadNum, 30, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(capacity), new SimpleThreadFactory(threadFactoryName), new LogDiscardRejectPolicy() ); } } public class SimpleThreadFactory implements ThreadFactory { private static final String NAME_FORMAT = "%s-%s"; private String threadNamePrefix; public SimpleThreadFactory(String threadNamePrefix) { this.threadNamePrefix = threadNamePrefix; } @Override public Thread newThread(Runnable runnable) { Thread thread = new Thread(runnable); thread.setName(String.format(NAME_FORMAT, threadNamePrefix, System.currentTimeMillis())); return thread; } } public class LogDiscardRejectPolicy implements RejectedExecutionHandler { private static final Logger LOG = LoggerFactory.getLogger(LogDiscardRejectPolicy.class); @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { LOG.error("executor:{} task queue has full, runnable:{} discarded",executor,r); if (!(r instanceof PublishTask)) { return; } PublishTask publishTask = (PublishTask) r; InterceptHandler interceptHandler = publishTask.getInterceptHandler(); if (!(interceptHandler instanceof RejectHandler)) { return; } ((RejectHandler)interceptHandler).rejectedExecution(r,executor); } } public interface RejectHandler { public void rejectedExecution(Runnable r, ThreadPoolExecutor executor); }
BrokerInterceptor 建立線程池的邏輯改成函數
private BrokerInterceptor(int poolSize, int capacity, List<InterceptHandler> handlers) { LOG.info("Initializing broker interceptor. InterceptorIds={}", getInterceptorIds(handlers)); this.handlers = new HashMap<>(); for (Class<?> messageType : InterceptHandler.ALL_MESSAGE_TYPES) { this.handlers.put(messageType, new CopyOnWriteArrayList<InterceptHandler>()); } for (InterceptHandler handler : handlers) { this.addInterceptHandler(handler); } /** modify by liuhh */ executor = ThreadPoolHelper.createFixedExecutor(poolSize, capacity, THREAD_POOL_NAME); //executor = Executors.newFixedThreadPool(poolSize); }
解釋:
(1)ThreadPoolHelper中的createFixedExecutor()方法爲新建的線程池指定任務隊列大小和拒絕策略LogDiscardRejectPolicy
(2)在LogDiscardRejectPolicy中,首先將被拒絕的任務log一遍,對於PublishTask(moquette改造筆記(二):優化BrokerInterceptor notifyTopicPublished()邏輯)作特殊處理,會交給實現RejectHandler的InterceptHandler處理,由業務邏輯決定,出現任務太多處理不完被遺棄的任務該如何處理。高併發
在 moquette改造筆記(一):整合到SpringBoot 中修改SafetyInterceptHandler實現,添加對RejectHandler的實現以下優化
@Slf4j @Component public class SafetyInterceptHandler extends AbstractInterceptHandler{ @Override public String getID() { return SafetyInterceptHandler.class.getName(); } @Override public void onConnect(InterceptConnectMessage msg) { } @Override public void onConnectionLost(InterceptConnectionLostMessage msg) { } @Override public void onPublish(InterceptPublishMessage msg) { } @Override public void onMessageAcknowledged(InterceptAcknowledgedMessage msg) { } @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { /**MQTT SERVICE 負載過大,處理不過來時,會回調該方法*/ //例如能夠發生郵件通知相關人員 } }
moquette改造筆記(四):解決InterceptHandler中的onConnectionLost()調用兩次this