Dubbo線程池耗盡原理分析Thread pool is EXHAUSTED

對最近遇到的業務應用dubbo線程池爆滿(異常:RejectedExecutionException:Thread pool is EXHAUSTED)問題進行了分析。java

1、問題回顧:spring

業務應用dubbo配置以下:express

<dubbo:protocol name="dubbo" port="${dubbo.port}" /}

在dubbo的spring配置中,業務應用並無配置threadpool, threads等參數,查看dubbo.io用戶文檔,可知:默認配置爲:apache

<dubbo:protocol name="dubbo" dispatcher="all" threadpool="fixed" threads="100" />

查看dubbo代碼(版本爲2.5.3)實現:FixedThreadPool.java緩存

/*
 * Copyright 1999-2011 Alibaba Group.
 *  
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *  
 *      http://www.apache.org/licenses/LICENSE-2.0
 *  
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.alibaba.dubbo.common.threadpool.support.fixed;

import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.threadpool.ThreadPool;
import com.alibaba.dubbo.common.threadpool.support.AbortPolicyWithReport;
import com.alibaba.dubbo.common.utils.NamedThreadFactory;

/**
 * 此線程池啓動時即建立固定大小的線程數,不作任何伸縮,來源於:<code>Executors.newFixedThreadPool()</code>
 * 
 * @see java.util.concurrent.Executors#newFixedThreadPool(int)
 * @author william.liangf
 */
public class FixedThreadPool implements ThreadPool {

    public Executor getExecutor(URL url) {
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS, 
        		queues == 0 ? new SynchronousQueue<Runnable>() : 
        			(queues < 0 ? new LinkedBlockingQueue<Runnable>() 
        					: new LinkedBlockingQueue<Runnable>(queues)),
        		new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }

}

其中:安全

public class Constants {

    // the other constant
    // ......

    public static final String  DEFAULT_THREAD_NAME = "Dubbo";
    public static final int     DEFAULT_THREADS = 200;
    public static final int     DEFAULT_QUEUES = 0;
}

由代碼可知:dubbo的線程池採用jdk的ThreadPoolExecutor,默認threads數爲200,默認隊列長度爲0,此時默認採用了SynchronousQueue隊列,而若是用戶配置的隊列長度大於0時,則會採用LinkedBlockingQueue隊列。【注意:因而可知,dubbo.io用戶文檔中,默認threads=100是錯誤的,實際爲200】app

SynchronousQueue:是一個緩存爲1的阻塞隊列,某次添加元素後必須等待其餘線程取走後才能繼續添加。less

LinkedBlockingQueue:線程安全的基於鏈表的阻塞隊列,實現了入隊出隊的分離(分別採用putLock和takeLock鎖,所以能夠同時入隊和出隊操做)位於java.util.concurrent包下,是一個無界隊列。ide

ThreadPoolExecutor的完整構造方法的簽名是:ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) .

corePoolSize - 池中所保存的線程數,包括空閒線程。
maximumPoolSize-池中容許的最大線程數。
keepAliveTime - 當線程數大於核心時,此爲終止前多餘的空閒線程等待新任務的最長時間。
unit - keepAliveTime 參數的時間單位。
workQueue - 執行前用於保持任務的隊列。此隊列僅保持由 execute方法提交的 Runnable任務。
threadFactory - 執行程序建立新線程時使用的工廠。
handler - 因爲超出線程範圍和隊列容量而使執行被阻塞時所使用的處理程序。
ThreadPoolExecutor是Executors類的底層實現。

dubbo默認建立固定大小的線程池(200), 每次提交一個任務就建立一個線程,直到線程數達到線程池大小200,線程池的大小一旦達到最大值就保持不變。若是某個線程由於執行異常而結束,那麼線程池會補充一個新的線程。ui

因爲dubbo默認採用了直接提交的SynchronousQueue工做隊列,因此,全部的task會直接提交給線程池中的某一worker線程,若是沒有可用線程,那麼會拒絕任務的處理,而拋出異常RejectedExecutionException:Thread pool is EXHAUSTED.

/*
 * Copyright 1999-2011 Alibaba Group.
 *  
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *  
 *      http://www.apache.org/licenses/LICENSE-2.0
 *  
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.alibaba.dubbo.common.threadpool.support;

import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;

import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;

/**
 * Abort Policy.
 * Log warn info when abort.
 * 
 * @author ding.lid
 */
public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {
    
    protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class);
    
    private final String threadName;
    
    private final URL url;
    
    public AbortPolicyWithReport(String threadName, URL url) {
        this.threadName = threadName;
        this.url = url;
    }
    
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        String msg = String.format("Thread pool is EXHAUSTED!" +
                " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
                " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!" ,
                threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
                e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
                url.getProtocol(), url.getIp(), url.getPort());
        logger.warn(msg);
        throw new RejectedExecutionException(msg);
    }

}

所以,若是系統拋出異常RejectedExecutionException:Thread pool is EXHAUSTED.能夠經過調大dubbo線程池解決該問題,或者下降對TPS的預期。

<dubbo:protocol name="dubbo" dispatcher="all" threadpool="fixed" threads="500" />

此處修改了dubbo線程池爲500,以處理更多的任務。

相關文章
相關標籤/搜索