歡迎你們關注公衆號「JAVA前線」查看更多精彩分享文章,主要包括源碼分析、實際應用、架構思惟、職場分享、產品思考等等,同時歡迎你們加我微信「java_front」一塊兒交流學習java
你們可能都遇到過DUBBO線程池打滿這個問題,剛開始遇到這個問題可能會比較慌,常見方案可能就是重啓服務,但也不知道重啓是否能夠解決。我認爲重啓不只不能解決問題,甚至有可能加重問題,這是爲何呢?本文咱們就一塊兒分析DUBBO線程池打滿這個問題。linux
DUBBO底層網絡通訊採用Netty框架,咱們編寫一個Netty服務端進行觀察:spring
public class NettyServer {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(8);
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyServerHandler());
}
});
ChannelFuture channelFuture = bootstrap.bind(7777).sync();
System.out.println("服務端準備就緒");
channelFuture.channel().closeFuture().sync();
} catch (Exception ex) {
System.out.println(ex.getMessage());
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
複製代碼
BossGroup線程組只有一個線程處理客戶端鏈接請求,鏈接完成後將完成三次握手的SocketChannel鏈接分發給WorkerGroup處理讀寫請求,這兩個線程組被稱爲「IO線程」。數據庫
咱們再引出「業務線程」這個概念。服務生產者接收到請求後,若是處理邏輯能夠快速處理完成,那麼能夠直接放在IO線程處理,從而減小線程池調度與上下文切換。可是若是處理邏輯很是耗時,或者會發起新IO請求例如查詢數據庫,那麼必須派發到業務線程池處理。apache
DUBBO提供了多種線程模型,選擇線程模型須要在配置文件指定dispatcher屬性:bootstrap
<dubbo:protocol name="dubbo" dispatcher="all" />
<dubbo:protocol name="dubbo" dispatcher="direct" />
<dubbo:protocol name="dubbo" dispatcher="message" />
<dubbo:protocol name="dubbo" dispatcher="execution" />
<dubbo:protocol name="dubbo" dispatcher="connection" />
複製代碼
不一樣線程模型在選擇是使用IO線程仍是業務線程,DUBBO官網文檔說明以下:windows
all
全部消息都派發到業務線程池,包括請求,響應,鏈接事件,斷開事件,心跳
direct
全部消息都不派發到業務線程池,所有在IO線程直接執行
message
只有請求響應消息派發到業務線程池,其它鏈接斷開事件,心跳等消息直接在IO線程執行
execution
只有請求消息派發到業務線程池,響應和其它鏈接斷開事件,心跳等消息直接在IO線程執行
connection
在IO線程上將鏈接斷開事件放入隊列,有序逐個執行,其它消息派發到業務線程池
複製代碼
生產者和消費者在初始化時會肯定線程模型:數組
// 生產者
public class NettyServer extends AbstractServer implements Server {
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
}
// 消費者
public class NettyClient extends AbstractClient {
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
super(url, wrapChannelHandler(url, handler));
}
}
複製代碼
生產者和消費者默認線程模型都是AllDispatcher,ChannelHandlers.wrap方法能夠獲取Dispatch自適應擴展點。若是咱們在配置文件中指定dispatcher,擴展點加載器會從URL獲取屬性值加載對應線程模型。本文以生產者爲例進行分析:緩存
public class NettyServer extends AbstractServer implements Server {
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
// ChannelHandlers.wrap肯定線程策略
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
}
public class ChannelHandlers {
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch(handler, url)));
}
}
@SPI(AllDispatcher.NAME)
public interface Dispatcher {
@Adaptive({Constants.DISPATCHER_KEY, "channel.handler"})
ChannelHandler dispatch(ChannelHandler handler, URL url);
}
複製代碼
咱們分析其中兩個線程模型源碼,其它線程模型請閱讀DUBBO源碼。AllDispatcher模型全部消息都派發到業務線程池,包括請求,響應,鏈接事件,斷開事件,心跳:微信
public class AllDispatcher implements Dispatcher {
// 線程模型名稱
public static final String NAME = "all";
// 具體實現策略
@Override
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
return new AllChannelHandler(handler, url);
}
}
public class AllChannelHandler extends WrappedChannelHandler {
@Override
public void connected(Channel channel) throws RemotingException {
// 鏈接完成事件交給業務線程池
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
} catch (Throwable t) {
throw new ExecutionException("connect event", channel, getClass() + " error when process connected event", t);
}
}
@Override
public void disconnected(Channel channel) throws RemotingException {
// 斷開鏈接事件交給業務線程池
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
} catch (Throwable t) {
throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event", t);
}
}
@Override
public void received(Channel channel, Object message) throws RemotingException {
// 請求響應事件交給業務線程池
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
if(message instanceof Request && t instanceof RejectedExecutionException) {
Request request = (Request)message;
if(request.isTwoWay()) {
String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
Response response = new Response(request.getId(), request.getVersion());
response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
response.setErrorMessage(msg);
channel.send(response);
return;
}
}
throw new ExecutionException(message, channel, getClass() + " error when process received event", t);
}
}
@Override
public void caught(Channel channel, Throwable exception) throws RemotingException {
// 異常事件交給業務線程池
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
} catch (Throwable t) {
throw new ExecutionException("caught event", channel, getClass() + " error when process caught event", t);
}
}
}
複製代碼
DirectDispatcher策略全部消息都不派發到業務線程池,所有在IO線程直接執行:
public class DirectDispatcher implements Dispatcher {
// 線程模型名稱
public static final String NAME = "direct";
// 具體實現策略
@Override
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
// 直接返回handler表示全部事件都交給IO線程處理
return handler;
}
}
複製代碼
上個章節分析了線程模型,咱們知道不一樣的線程模型會選擇使用仍是IO線程仍是業務線程。若是使用業務線程池,那麼使用什麼線程池策略是本章節須要回答的問題。DUBBO官網線程派發模型圖展現了線程模型和線程池策略的關係:
DUBBO提供了多種線程池策略,選擇線程池策略須要在配置文件指定threadpool屬性:
<dubbo:protocol name="dubbo" threadpool="fixed" threads="100" />
<dubbo:protocol name="dubbo" threadpool="cached" threads="100" />
<dubbo:protocol name="dubbo" threadpool="limited" threads="100" />
<dubbo:protocol name="dubbo" threadpool="eager" threads="100" />
複製代碼
不一樣線程池策略會建立不一樣特性的線程池:
fixed
包含固定個數線程
cached
線程空閒一分鐘會被回收,當新請求到來時會建立新線程
limited
線程個數隨着任務增長而增長,但不會超過最大閾值。空閒線程不會被回收
eager
當全部核心線程數都處於忙碌狀態時,優先建立新線程執行任務,而不是當即放入隊列
複製代碼
本文以AllDispatcher爲例分析線程池策略肯定時機:
public class AllDispatcher implements Dispatcher {
public static final String NAME = "all";
@Override
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
return new AllChannelHandler(handler, url);
}
}
public class AllChannelHandler extends WrappedChannelHandler {
public AllChannelHandler(ChannelHandler handler, URL url) {
super(handler, url);
}
}
複製代碼
在WrappedChannelHandler構造函數中若是配置指定threadpool屬性,擴展點加載器會從URL獲取屬性值加載對應線程池策略,默認策略爲fixed:
public class WrappedChannelHandler implements ChannelHandlerDelegate {
public WrappedChannelHandler(ChannelHandler handler, URL url) {
this.handler = handler;
this.url = url;
// 獲取線程池自適應擴展點
executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) {
componentKey = Constants.CONSUMER_SIDE;
}
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
}
}
@SPI("fixed")
public interface ThreadPool {
@Adaptive({Constants.THREADPOOL_KEY})
Executor getExecutor(URL url);
}
複製代碼
public class FixedThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
// 線程名稱
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
// 線程個數默認200
int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
// 隊列容量默認0
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
// 隊列容量等於0使用阻塞隊列SynchronousQueue
// 隊列容量小於0使用無界阻塞隊列LinkedBlockingQueue
// 隊列容量大於0使用有界阻塞隊列LinkedBlockingQueue
return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>()
: (queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
複製代碼
public class CachedThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
// 獲取線程名稱
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
// 核心線程數默認0
int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
// 最大線程數默認Int最大值
int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
// 隊列容量默認0
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
// 線程空閒多少時間被回收默認1分鐘
int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
// 隊列容量等於0使用阻塞隊列SynchronousQueue
// 隊列容量小於0使用無界阻塞隊列LinkedBlockingQueue
// 隊列容量大於0使用有界阻塞隊列LinkedBlockingQueue
return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>()
: (queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
複製代碼
public class LimitedThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
// 獲取線程名稱
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
// 核心線程數默認0
int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
// 最大線程數默認200
int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
// 隊列容量默認0
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
// 隊列容量等於0使用阻塞隊列SynchronousQueue
// 隊列容量小於0使用無界阻塞隊列LinkedBlockingQueue
// 隊列容量大於0使用有界阻塞隊列LinkedBlockingQueue
// keepalive時間設置Long.MAX_VALUE表示不回收空閒線程
return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>()
: (queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
複製代碼
咱們知道ThreadPoolExecutor是普通線程執行器。當線程池核心線程達到閾值時新任務放入隊列,當隊列已滿開啓新線程處理,當前線程數達到最大線程數時執行拒絕策略。
可是EagerThreadPool自定義線程執行策略,當線程池核心線程達到閾值時,新任務不會放入隊列而是開啓新線程進行處理(要求當前線程數沒有超過最大線程數)。當前線程數達到最大線程數時任務放入隊列。
public class EagerThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
// 線程名
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
// 核心線程數默認0
int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
// 最大線程數默認Int最大值
int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
// 隊列容量默認0
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
// 線程空閒多少時間被回收默認1分鐘
int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
// 初始化自定義線程池和隊列重寫相關方法
TaskQueue<Runnable> taskQueue = new TaskQueue<Runnable>(queues <= 0 ? 1 : queues);
EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores,
threads,
alive,
TimeUnit.MILLISECONDS,
taskQueue,
new NamedInternalThreadFactory(name, true),
new AbortPolicyWithReport(name, url));
taskQueue.setExecutor(executor);
return executor;
}
}
複製代碼
咱們知道DUBBO會選擇線程池策略進行業務處理,那麼如何估算可能產生的線程數呢?咱們首先分析一個問題:一個公司有7200名員工,天天上班打卡時間是早上8點到8點30分,每次打卡時間系統耗時5秒。請問RT、QPS、併發量分別是多少?
RT表示響應時間,問題已經告訴了咱們答案:
RT = 5
QPS表示每秒查詢量,假設簽到行爲平均分佈:
QPS = 7200 / (30 * 60) = 4
併發量表示系統同時處理的請求數量:
併發量 = QPS x RT = 4 x 5 = 20
根據上述實例引出以下公式:
併發量 = QPS x RT
若是系統爲每個請求分配一個處理線程,那麼併發量能夠近似等於線程數。基於上述公式不難看出併發量受QPS和RT影響,這兩個指標任意一個上升就會致使併發量上升。
可是這只是理想狀況,由於併發量受限於系統能力而不可能持續上升,例如DUBBO線程池就對線程數作了限制,超出最大線程數限制則會執行拒絕策略,而拒絕策略會提示線程池已滿,這就是DUBBO線程池打滿問題的根源。下面咱們分別分析RT上升和QPS上升這兩個緣由。
<beans>
<dubbo:registry address="zookeeper://127.0.0.1:2181" />
<dubbo:protocol name="dubbo" port="9999" />
<dubbo:service interface="com.java.front.dubbo.demo.provider.HelloService" ref="helloService" />
</beans>
複製代碼
package com.java.front.dubbo.demo.provider;
public interface HelloService {
public String sayHello(String name) throws Exception;
}
public class HelloServiceImpl implements HelloService {
public String sayHello(String name) throws Exception {
String result = "hello[" + name + "]";
// 模擬慢服務
Thread.sleep(10000L);
System.out.println("生產者執行結果" + result);
return result;
}
}
複製代碼
<beans>
<dubbo:registry address="zookeeper://127.0.0.1:2181" />
<dubbo:reference id="helloService" interface="com.java.front.dubbo.demo.provider.HelloService" />
</beans>
複製代碼
public class Consumer {
@Test
public void testThread() {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "classpath*:METAINF/spring/dubbo-consumer.xml" });
context.start();
for (int i = 0; i < 500; i++) {
new Thread(new Runnable() {
@Override
public void run() {
HelloService helloService = (HelloService) context.getBean("helloService");
String result;
try {
result = helloService.sayHello("微信公衆號「JAVA前線」");
System.out.println("客戶端收到結果" + result);
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
}).start();
}
}
}
複製代碼
依次運行生產者和消費者代碼,會發現日誌中會出現報錯信息。生產者日誌會打印線程池已滿:
Caused by: java.util.concurrent.RejectedExecutionException: Thread pool is EXHAUSTED! Thread Name: DubboServerHandler-x.x.x.x:9999, Pool Size: 200 (active: 200, core: 200, max: 200, largest: 200), Task: 201 (completed: 1), Executor status:(isShutdown:false, isTerminated:false, isTerminating:false), in dubbo://x.x.x.x:9999!
at org.apache.dubbo.common.threadpool.support.AbortPolicyWithReport.rejectedExecution(AbortPolicyWithReport.java:67)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at org.apache.dubbo.remoting.transport.dispatcher.all.AllChannelHandler.caught(AllChannelHandler.java:88)
... 25 more
複製代碼
消費者日誌不只會打印線程池已滿,還會打印服務提供者信息和調用方法,咱們能夠根據日誌找到哪個方法有問題:
Failed to invoke the method sayHello in the service com.java.front.dubbo.demo.provider.HelloService.
Tried 3 times of the providers [x.x.x.x:9999] (1/1) from the registry 127.0.0.1:2181 on the consumer x.x.x.x
using the dubbo version 2.7.0-SNAPSHOT. Last error is: Failed to invoke remote method: sayHello,
provider: dubbo://x.x.x.x:9999/com.java.front.dubbo.demo.provider.HelloService?anyhost=true&application=xpz-consumer1&check=false&dubbo=2.0.2&generic=false&group=&interface=com.java.front.dubbo.demo.provider.HelloService&logger=log4j&methods=sayHello&pid=33432®ister.ip=x.x.x.x&release=2.7.0-SNAPSHOT&remote.application=xpz-provider&remote.timestamp=1618632597509&side=consumer&timeout=100000000×tamp=1618632617392,
cause: Server side(x.x.x.x,9999) threadpool is exhausted ,detail msg:Thread pool is EXHAUSTED! Thread Name: DubboServerHandler-x.x.x.x:9999, Pool Size: 200 (active: 200, core: 200, max: 200, largest: 200), Task: 401 (completed: 201), Executor status:(isShutdown:false, isTerminated:false, isTerminating:false), in dubbo://x.x.x.x:9999!
複製代碼
DUBBO線程池打滿時會執行拒絕策略:
public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {
protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class);
private final String threadName;
private final URL url;
private static volatile long lastPrintTime = 0;
private static Semaphore guard = new Semaphore(1);
public AbortPolicyWithReport(String threadName, URL url) {
this.threadName = threadName;
this.url = url;
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
String msg = String.format("Thread pool is EXHAUSTED!" +
" Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
" Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",
threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
url.getProtocol(), url.getIp(), url.getPort());
logger.warn(msg);
// 打印線程快照
dumpJStack();
throw new RejectedExecutionException(msg);
}
private void dumpJStack() {
long now = System.currentTimeMillis();
// 每10分鐘輸出線程快照
if (now - lastPrintTime < 10 * 60 * 1000) {
return;
}
if (!guard.tryAcquire()) {
return;
}
ExecutorService pool = Executors.newSingleThreadExecutor();
pool.execute(() -> {
String dumpPath = url.getParameter(Constants.DUMP_DIRECTORY, System.getProperty("user.home"));
System.out.println("AbortPolicyWithReport dumpJStack directory=" + dumpPath);
SimpleDateFormat sdf;
String os = System.getProperty("os.name").toLowerCase();
// linux文件位置/home/xxx/Dubbo_JStack.log.2021-01-01_20:50:15
// windows文件位置/user/xxx/Dubbo_JStack.log.2020-01-01_20-50-15
if (os.contains("win")) {
sdf = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss");
} else {
sdf = new SimpleDateFormat("yyyy-MM-dd_HH:mm:ss");
}
String dateStr = sdf.format(new Date());
try (FileOutputStream jStackStream = new FileOutputStream(new File(dumpPath, "Dubbo_JStack.log" + "." + dateStr))) {
JVMUtil.jstack(jStackStream);
} catch (Throwable t) {
logger.error("dump jStack error", t);
} finally {
guard.release();
}
lastPrintTime = System.currentTimeMillis();
});
pool.shutdown();
}
}
複製代碼
拒絕策略會輸出線程快照文件以保護現場,在分析線程快照文件時BLOCKED和TIMED_WAITING線程狀態須要咱們重點關注。若是發現大量線程阻塞或者等待狀態則能夠定位到具體代碼行:
DubboServerHandler-x.x.x.x:9999-thread-200 Id=230 TIMED_WAITING
at java.lang.Thread.sleep(Native Method)
at com.java.front.dubbo.demo.provider.HelloServiceImpl.sayHello(HelloServiceImpl.java:13)
at org.apache.dubbo.common.bytecode.Wrapper1.invokeMethod(Wrapper1.java)
at org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory$1.doInvoke(JavassistProxyFactory.java:56)
at org.apache.dubbo.rpc.proxy.AbstractProxyInvoker.invoke(AbstractProxyInvoker.java:85)
at org.apache.dubbo.config.invoker.DelegateProviderMetaDataInvoker.invoke(DelegateProviderMetaDataInvoker.java:56)
at org.apache.dubbo.rpc.protocol.InvokerWrapper.invoke(InvokerWrapper.java:56)
複製代碼
如今咱們已經找到了慢服務,此時能夠優化慢服務了。優化慢服務就須要具體問題具體分析了,這不是本文的重點在此不進行展開。
還有一種RT上升的狀況是咱們不能忽視的,這種狀況就是提供者重啓後預熱不充分即被調用。由於當生產者剛啓動時須要預熱,須要和其它資源例如數據庫、緩存等創建鏈接,創建鏈接是須要時間的。若是此時大量消費者請求到未預熱的生產者,鏈路時間增長了鏈接時間,RT時間必然會增長,從而也會致使DUBBO線程池打滿的問題。
由於生產者預熱不充分致使線程池打滿問題,最容易發生在系統發佈時。例如發佈了一臺機器後發現線上出現線程池打滿問題,不要着急重啓機器,而是給機器一段時間預熱,等鏈接創建後問題會消失。同時咱們在發佈時也要分多批發布,不用一次發佈太多致使服務由於預熱問題形成大面積影響。
DUBBO消費者在調用時自己就有預熱機制,爲何還會出現預熱不充分問題?這是由於2.5.5以前版本以及2.7.2版本預熱機制是有問題的,簡而言之就是獲取啓動時間不正確致使預熱失效,2.7.4版本完全解決了這個問題,因此咱們要避免使用問題版本。下面咱們閱讀2.7.0版本的預熱機制源碼,看一看預熱機制如何發揮做用:
public class RandomLoadBalance extends AbstractLoadBalance {
public static final String NAME = "random";
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// invokers數量
int length = invokers.size();
// 權重是否相同
boolean sameWeight = true;
// invokers權重數組
int[] weights = new int[length];
// 第一個invoker權重
int firstWeight = getWeight(invokers.get(0), invocation);
weights[0] = firstWeight;
// 權重值之和
int totalWeight = firstWeight;
for (int i = 1; i < length; i++) {
// 計算權重值
int weight = getWeight(invokers.get(i), invocation);
weights[i] = weight;
totalWeight += weight;
// 任意一個invoker權重值不等於第一個invoker權重值則sameWeight設置爲FALSE
if (sameWeight && weight != firstWeight) {
sameWeight = false;
}
}
// 權重值不等則根據總權重值計算
if (totalWeight > 0 && !sameWeight) {
int offset = ThreadLocalRandom.current().nextInt(totalWeight);
// 不斷減去權重值當小於0時直接返回
for (int i = 0; i < length; i++) {
offset -= weights[i];
if (offset < 0) {
return invokers.get(i);
}
}
}
// 全部服務權重值一致則隨機返回
return invokers.get(ThreadLocalRandom.current().nextInt(length));
}
}
public abstract class AbstractLoadBalance implements LoadBalance {
static int calculateWarmupWeight(int uptime, int warmup, int weight) {
// uptime/(warmup*weight)
// 若是當前服務提供者沒過預熱期,用戶設置的權重將經過uptime/warmup減少
// 若是服務提供者設置權重很大可是還沒過預熱時間,從新計算權重會很小
int ww = (int) ((float) uptime / ((float) warmup / (float) weight));
return ww < 1 ? 1 : (ww > weight ? weight : ww);
}
protected int getWeight(Invoker<?> invoker, Invocation invocation) {
// 獲取invoker設置權重值默認權重=100
int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);
// 若是權重大於0
if (weight > 0) {
// 服務提供者發佈服務時間戳
long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L);
if (timestamp > 0L) {
// 服務已經發布多少時間
int uptime = (int) (System.currentTimeMillis() - timestamp);
// 預熱時間默認10分鐘
int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP);
// 生產者發佈時間大於0可是小於預熱時間
if (uptime > 0 && uptime < warmup) {
// 從新計算權重值
weight = calculateWarmupWeight(uptime, warmup, weight);
}
}
}
// 服務發佈時間大於預熱時間直接返回設置權重值
return weight >= 0 ? weight : 0;
}
}
複製代碼
上面章節大篇幅討論了因爲RT上升形成的線程池打滿問題,如今咱們討論另外一個參數QPS。當上遊流量激增會致使建立大量線程池,也會形成線程池打滿問題。這時若是發現QPS超出了系統承受能力,咱們不得不採用降級方案保護系統,請參看我以前文章《從反脆弱角度談技術系統的高可用性》
本文首先介紹了DUBBO線程模型和線程池策略,而後咱們引出了公式,發現併發量受RT和QPS兩個參數影響,這兩個參數任意一個上升均可以形成線程池打滿問題。生產者出現慢服務或者預熱不充分都有可能形成RT上升,而上游流量激增會形成QPS上升,同時本文也給出瞭解決方案。DUBBO線程池打盡是一個必須重視的問題,但願本文對你們有所幫助。
歡迎你們關注公衆號「JAVA前線」查看更多精彩分享文章,主要包括源碼分析、實際應用、架構思惟、職場分享、產品思考等等,同時歡迎你們加我微信「java_front」一塊兒交流學習