對最近遇到的業務應用dubbo線程池爆滿(異常:RejectedExecutionException:Thread pool is EXHAUSTED)問題進行了分析。java
1、問題回顧:spring
業務應用dubbo配置以下:express
<dubbo:protocol name="dubbo" port="${dubbo.port}" /}
在dubbo的spring配置中,業務應用並無配置threadpool, threads等參數,查看dubbo.io用戶文檔,可知:默認配置爲:apache
<dubbo:protocol name="dubbo" dispatcher="all" threadpool="fixed" threads="100" />
查看dubbo代碼(版本爲2.5.3)實現:FixedThreadPool.java緩存
/* * Copyright 1999-2011 Alibaba Group. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.alibaba.dubbo.common.threadpool.support.fixed; import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import com.alibaba.dubbo.common.Constants; import com.alibaba.dubbo.common.URL; import com.alibaba.dubbo.common.threadpool.ThreadPool; import com.alibaba.dubbo.common.threadpool.support.AbortPolicyWithReport; import com.alibaba.dubbo.common.utils.NamedThreadFactory; /** * 此線程池啓動時即建立固定大小的線程數,不作任何伸縮,來源於:<code>Executors.newFixedThreadPool()</code> * * @see java.util.concurrent.Executors#newFixedThreadPool(int) * @author william.liangf */ public class FixedThreadPool implements ThreadPool { public Executor getExecutor(URL url) { String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME); int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS); int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES); return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS, queues == 0 ? new SynchronousQueue<Runnable>() : (queues < 0 ? new LinkedBlockingQueue<Runnable>() : new LinkedBlockingQueue<Runnable>(queues)), new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url)); } }
其中:安全
public class Constants { // the other constant // ...... public static final String DEFAULT_THREAD_NAME = "Dubbo"; public static final int DEFAULT_THREADS = 200; public static final int DEFAULT_QUEUES = 0; }
由代碼可知:dubbo的線程池採用jdk的ThreadPoolExecutor,默認threads數爲200,默認隊列長度爲0,此時默認採用了SynchronousQueue隊列,而若是用戶配置的隊列長度大於0時,則會採用LinkedBlockingQueue隊列。【注意:因而可知,dubbo.io用戶文檔中,默認threads=100是錯誤的,實際爲200】app
SynchronousQueue:是一個緩存爲1的阻塞隊列,某次添加元素後必須等待其餘線程取走後才能繼續添加。less
LinkedBlockingQueue:線程安全的基於鏈表的阻塞隊列,實現了入隊出隊的分離(分別採用putLock和takeLock鎖,所以能夠同時入隊和出隊操做)位於java.util.concurrent包下,是一個無界隊列。ide
ThreadPoolExecutor的完整構造方法的簽名是:ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) . corePoolSize - 池中所保存的線程數,包括空閒線程。 maximumPoolSize-池中容許的最大線程數。 keepAliveTime - 當線程數大於核心時,此爲終止前多餘的空閒線程等待新任務的最長時間。 unit - keepAliveTime 參數的時間單位。 workQueue - 執行前用於保持任務的隊列。此隊列僅保持由 execute方法提交的 Runnable任務。 threadFactory - 執行程序建立新線程時使用的工廠。 handler - 因爲超出線程範圍和隊列容量而使執行被阻塞時所使用的處理程序。 ThreadPoolExecutor是Executors類的底層實現。
dubbo默認建立固定大小的線程池(200), 每次提交一個任務就建立一個線程,直到線程數達到線程池大小200,線程池的大小一旦達到最大值就保持不變。若是某個線程由於執行異常而結束,那麼線程池會補充一個新的線程。ui
因爲dubbo默認採用了直接提交的SynchronousQueue工做隊列,因此,全部的task會直接提交給線程池中的某一worker線程,若是沒有可用線程,那麼會拒絕任務的處理,而拋出異常RejectedExecutionException:Thread pool is EXHAUSTED.
/* * Copyright 1999-2011 Alibaba Group. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.alibaba.dubbo.common.threadpool.support; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; import com.alibaba.dubbo.common.URL; import com.alibaba.dubbo.common.logger.Logger; import com.alibaba.dubbo.common.logger.LoggerFactory; /** * Abort Policy. * Log warn info when abort. * * @author ding.lid */ public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy { protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class); private final String threadName; private final URL url; public AbortPolicyWithReport(String threadName, URL url) { this.threadName = threadName; this.url = url; } @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { String msg = String.format("Thread pool is EXHAUSTED!" + " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," + " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!" , threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(), e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(), url.getProtocol(), url.getIp(), url.getPort()); logger.warn(msg); throw new RejectedExecutionException(msg); } }
所以,若是系統拋出異常RejectedExecutionException:Thread pool is EXHAUSTED.能夠經過調大dubbo線程池解決該問題,或者下降對TPS的預期。
<dubbo:protocol name="dubbo" dispatcher="all" threadpool="fixed" threads="500" />
此處修改了dubbo線程池爲500,以處理更多的任務。