Dubbo 線程池源碼解析

本文首發於我的微信公衆號《andyqian》,期待你的關注!面試

前言

以前文章《Java線程池ThreadPoolExecutor》《ThreadPoolExecutor 原理解析》中,分別講述了ThreadPoolExecutor 的概念以及原理,今天就一塊兒來看看其在 Dubbo 框架中的應用。微信

ThreadFactory 與 AbortPolicy

  Dubbo 爲咱們提供了幾種不一樣類型的線程池實現,其底層均使用的是 JDK 中的 ThreadPoolExecutor 線程池。ThreadPoolExecutor 咱們都已經很是熟悉,其構造函數中有幾個很是重要的參數。其中就包括:拒絕策略( ThreadPoolExecutor.AbortPolicy ) 以及 ThreadFactory,在 Dubbo 中自定義了 ThreadPoolExecutor.AbortPolicy 以及 ThreadFactory。在學習線程池以前,咱們先來看看這二者的實現,更有益於後面的理解。多線程

在Dubbo中NamedInternalThreadFactory 爲自定義的線程 ThreadFactory 的子類。其類圖以下:框架

其中 NamedInternalThreadFactory 類,其實現以下所示:ide

public class NamedInternalThreadFactory extends NamedThreadFactory {

    public NamedInternalThreadFactory() {
        super();
    }

    public NamedInternalThreadFactory(String prefix) {
        super(prefix, false);
    }

    public NamedInternalThreadFactory(String prefix, boolean daemon) {
        super(prefix, daemon);
    }

    @Override
    public Thread newThread(Runnable runnable) {
        String name = mPrefix + mThreadNum.getAndIncrement();
        InternalThread ret = new InternalThread(mGroup, runnable, name, 0);
        ret.setDaemon(mDaemon);
        return ret;
    }
}

其中 NamedThreadFactory 類的實現以下:函數

public class NamedThreadFactory implements ThreadFactory {

    protected static final AtomicInteger POOL_SEQ = new AtomicInteger(1);

    protected final AtomicInteger mThreadNum = new AtomicInteger(1);

    protected final String mPrefix;

    protected final boolean mDaemon;

    protected final ThreadGroup mGroup;

    public NamedThreadFactory() {
        this("pool-" + POOL_SEQ.getAndIncrement(), false);
    }

    public NamedThreadFactory(String prefix) {
        this(prefix, false);
    }

    public NamedThreadFactory(String prefix, boolean daemon) {
        mPrefix = prefix + "-thread-";
        mDaemon = daemon;
        SecurityManager s = System.getSecurityManager();
        mGroup = (s == null) ? Thread.currentThread().getThreadGroup() : s.getThreadGroup();
    }

    @Override
    public Thread newThread(Runnable runnable) {
        String name = mPrefix + mThreadNum.getAndIncrement();
        Thread ret = new Thread(mGroup, runnable, name, 0);
        ret.setDaemon(mDaemon);
        return ret;
    }

    public ThreadGroup getThreadGroup() {
        return mGroup;
    }

到這裏,上述代碼描述的是Dubbo對線程池中線程的命名規則,其做用是爲了方便追蹤信息。工具

接下來,咱們來看下拒絕策略 AbortPolicyWithReport 類的實現,其類圖以下所示:學習

源碼以下:ui

public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {

    protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class);

    private final String threadName;

    private final URL url;

    private static volatile long lastPrintTime = 0;

    private static final long TEN_MINUTES_MILLS = 10 * 60 * 1000;

    private static final String OS_WIN_PREFIX = "win";

    private static final String OS_NAME_KEY = "os.name";

    private static final String WIN_DATETIME_FORMAT = "yyyy-MM-dd_HH-mm-ss";

    private static final String DEFAULT_DATETIME_FORMAT = "yyyy-MM-dd_HH:mm:ss";

    private static Semaphore guard = new Semaphore(1);

    public AbortPolicyWithReport(String threadName, URL url) {
        this.threadName = threadName;
        this.url = url;
    }

    // 覆蓋 父類 ThreadPoolExecutor.AbortPolicy 的 rejectedExecution 方法。
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
       // 構造 warn 參數,其中包括:線程狀態,線程池數量,活躍數量,核心線程池數量,最大線程池數量 等信息。
        String msg = String.format("Thread pool is EXHAUSTED!" +
                " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: "
                + "%d)," +
                " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",
            threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(),
            e.getLargestPoolSize(),
            e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
            url.getProtocol(), url.getIp(), url.getPort());
        logger.warn(msg);
        //  dump  堆棧信息
        dumpJStack();
        throw new RejectedExecutionException(msg);
    }

 // 當執行 rejectedExecution 方法時,會執行該方法。將會 dump 堆棧信息 至 DUMP_DIRECTORY 目錄,默認爲:user.name 目錄下。
    private void dumpJStack() {
        long now = System.currentTimeMillis();

        //dump every 10 minutes
        if (now - lastPrintTime < TEN_MINUTES_MILLS) {
            return;
        }

        if (!guard.tryAcquire()) {
            return;
        }

        ExecutorService pool = Executors.newSingleThreadExecutor();
        pool.execute(() -> {
            String dumpPath = url.getParameter(DUMP_DIRECTORY, System.getProperty("user.home"));

            SimpleDateFormat sdf;

            String os = System.getProperty(OS_NAME_KEY).toLowerCase();

            // window system don't support ":" in file name
            if (os.contains(OS_WIN_PREFIX)) {
                sdf = new SimpleDateFormat(WIN_DATETIME_FORMAT);
            } else {
                sdf = new SimpleDateFormat(DEFAULT_DATETIME_FORMAT);
            }

            String dateStr = sdf.format(new Date());
            //try-with-resources
            try (FileOutputStream jStackStream = new FileOutputStream(
                new File(dumpPath, "Dubbo_JStack.log" + "." + dateStr))) {
                // 工具類,此處實現省略,有興趣的能夠查看。
                JVMUtil.jstack(jStackStream);
            } catch (Throwable t) {
                logger.error("dump jStack error", t);
            } finally {
                guard.release();
            }
            lastPrintTime = System.currentTimeMillis();
        });
        //must shutdown thread pool ,if not will lead to OOM
        pool.shutdown();
    }
}

上面的代碼不難,都是打日誌,dump 堆棧信息,其目的就是:用於在線程池被打滿時,也就是記錄執行AbortPolicy時現場信息,主要是便於後期的分析與問題排查。this

線程池的實現

  上面講述了Dubbo線程池中自定義的 ThreadFactory 類 以及 AbortPolicyWithReport 類。接下來,咱們繼續講解 Dubbo 提供的不一樣線程池實現,其類圖以下所示:

 

1. LimitedThreadPool 線程池

源碼以下:

public class LimitedThreadPool implements ThreadPool {

    @Override
    public Executor getExecutor(URL url) {
        String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
        int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
        int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
        int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
        return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }

其中:

  1. THREAD_NAME_KEY 值爲:threadname ,表示爲:線程名,其默認值爲:Dubbo。

  2. CORE_THREADS_KEY 值爲:corethreads,表示:核心線程池數量,其默認值爲:0。

  3. THREADS_KEY 值爲:threads 表示:最大線程數,默認值爲:200。

  4. QUEUES_KEY 值爲:queues 表示:阻塞隊列大小,默認值爲:0。

     

備註:

  1. 該線程池中的 cores,threads 參數由外部制定,其中 keepAliveTime 值爲:Long.MAX_VALUE,TimeUnit 爲 TimeUnit.MILLISECONDS (毫秒)。(意味着線程池中的全部線程永不過時,理論上大於Long.MAX_VALUE 即會過時,由於其足夠大,這裏能夠看爲是永不過時 )。

  2. 此處使用了三目運算符:

    當 queues = 0 時,BlockingQueue爲SynchronousQueue。

    當 queues < 0 時,則構造一個新的LinkedBlockingQueue。

    當 queues > 0 時,構造一個指定元素的LinkedBlockingQueue。

    queues == 0 ? new SynchronousQueue<Runnable>() :
      (queues < 0 ? new LinkedBlockingQueue<Runnable>()
          : new LinkedBlockingQueue<Runnable>(queues)

該線程池的特色是:能夠建立若干個線程,其默認值爲 200,線程池中的線程生命週期很是長,甚至能夠看作是永不過時。

2.  CachedThreadPool 線程池

源碼:

public class CachedThreadPool implements ThreadPool {

    @Override
    public Executor getExecutor(URL url) {
        String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
        int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
        int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE);
        int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
        int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);
        return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }
}

其中:

  1. THREAD_NAME_KEY 值爲:threadname , 表示爲:線程名,其默認值爲:Dubbo。

  2. CORE_THREADS_KEY 值爲:corethreads,表示爲:核心線程池數量,其默認值爲:0。

  3. THREADS_KEY 值爲:threads,表示:最大線程數,默認值爲:Integer.MAX_VALUE。

  4. QUEUES_KEY 值爲:queues,表示:阻塞隊列大小,默認值爲:0。

  5. ALIVE_KEY 值爲:alive, 表示: keepAliveTime 表示線程池中線程的存活時間,其默認值爲:60 * 1000 (毫秒) 也就是一分鐘。

 

該線程池的特色是:可建立無限多線程(在操做系統的限制下,會遠遠低於Integer.MAX_VALUE值,這裏視爲無限大),其線程的最大存活時間默認爲 1 分鐘。意味着能夠建立無限多線程,可是線程的生命週期默認較短!

3. FixedThreadPool 線程池

public class FixedThreadPool implements ThreadPool {

    @Override
    public Executor getExecutor(URL url) {
        String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
        int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
        int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
        return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }

其中:

  1. THREADS_KEY 值爲:threads,表示:最大線程數,默認值爲:200。

  2. QUEUES_KEY 值爲:queues,表示:阻塞隊列大小,默認值爲:0。

  3. corePoolSize,maximumPoolSize 的線程數量均爲:threads,(也就意味着核心線程數等於最大線程數)。

  4. keepAliveTime 的默認值爲0,當線程數大於corePoolSize 時,多餘的空閒線程會當即終止。

     

該線程池的特色是:該線程池中corePoolSize 數量 與 maxinumPoolSize 數量一致,當提交的任務大於核心線程池時,則會將其放入到LinkedBlockingQueue隊列中等待執行,也是Dubbo中默認使用的線程池。

4. EagerThreadPool 線程池

源碼:

public class EagerThreadPool implements ThreadPool {

        @Override
        public Executor getExecutor(URL url) {
            String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
            int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
            int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE);
            int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
            int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);

            // init queue and executor
            TaskQueue<Runnable> taskQueue = new TaskQueue<Runnable>(queues <= 0 ? 1 : queues);
            EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores,
                    threads,
                    alive,
                    TimeUnit.MILLISECONDS,
                    taskQueue,
                    new NamedInternalThreadFactory(name, true),
                    new AbortPolicyWithReport(name, url));
            taskQueue.setExecutor(executor);
            return executor;
        }
}

備註:

  該線程池與上面的線程池實現方式有些不同,上面是直接使用了ThreadPoolExecutor 類的構造函數。在該線程池實現中,首先構造了一個自定義的 EagerThreadPoolExecutor 線程池,其底層實現也是基於 ThreadPoolExecutor 類的,其代碼以下所示:

public class EagerThreadPoolExecutor extends ThreadPoolExecutor {

    /**
     * task count
     */
    private final AtomicInteger submittedTaskCount = new AtomicInteger(0);

    public EagerThreadPoolExecutor(int corePoolSize,
                                   int maximumPoolSize,
                                   long keepAliveTime,
                                   TimeUnit unit, TaskQueue<Runnable> workQueue,
                                   ThreadFactory threadFactory,
                                   RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    /**
     * @return current tasks which are executed
     */
    public int getSubmittedTaskCount() {
        return submittedTaskCount.get();
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        // 執行任務數依次遞減
        submittedTaskCount.decrementAndGet();
    }

    @Override
    public void execute(Runnable command) {
        if (command == null) {
            throw new NullPointerException();
        }
        // do not increment in method beforeExecute!
        // 提交任務書 依次遞加。
        submittedTaskCount.incrementAndGet();
        try {
          // 調用父類方法執行線程任務
            super.execute(command);
        } catch (RejectedExecutionException rx) {
            // 將任務從新添加到隊列中
            final TaskQueue queue = (TaskQueue) super.getQueue();
            try {
               //若是添加失敗,則減小任務數,並拋出異常。
                if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
                    submittedTaskCount.decrementAndGet();
                    throw new RejectedExecutionException("Queue capacity is full.", rx);
                }
            } catch (InterruptedException x) {
                submittedTaskCount.decrementAndGet();
                throw new RejectedExecutionException(x);
            }
        } catch (Throwable t) {
            // decrease any way
            submittedTaskCount.decrementAndGet();
            throw t;
        }
    }

在這裏咱們發現,在 EagerThreadPoolExecutor 類中,重載了父類ThreadPoolExecutor 類的幾個方法,分別以下:afterExecuteexecute方法。分別加入 submittedTaskCount 屬性進行任務的統計,當父類的execute方法拋出 RejectedExecutionExcetion 異常時,則會將任務從新放入隊列中執行,其TaskQueue代碼以下:

public class TaskQueue<R extends Runnable> extends LinkedBlockingQueue<Runnable> {

    private static final long serialVersionUID = -2635853580887179627L;

    private EagerThreadPoolExecutor executor;

    public TaskQueue(int capacity) {
        super(capacity);
    }

    public void setExecutor(EagerThreadPoolExecutor exec) {
        executor = exec;
    }

    @Override
    public boolean offer(Runnable runnable) {
        if (executor == null) {
            throw new RejectedExecutionException("The task queue does not have executor!");
        }

        int currentPoolThreadSize = executor.getPoolSize();
        // have free worker. put task into queue to let the worker deal with task.
        // 當提交的任務數,小於 當前線程時,則之間調用父類的offer 方法。
        if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
            return super.offer(runnable);
        }

        // return false to let executor create new worker.
        // 噹噹前線程數大小小於,最大線程數時,則直接返回false,建立worker。
        if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
            return false;
        }

        // currentPoolThreadSize >= max
        return super.offer(runnable);
    }

    /**
     * retry offer task
     *
     * @param o task
     * @return offer success or not
     * @throws RejectedExecutionException if executor is terminated.
     */
    public boolean retryOffer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
        if (executor.isShutdown()) {
            throw new RejectedExecutionException("Executor is shutdown!");
        }
        return super.offer(o, timeout, unit);
    }
}

該線程池的特色是:能夠從新將拒絕掉的task,從新添加的work queue中執行。至關於有一個重試機制!

結語

  經過上面的分析,我相信你們對Dubbo中線程池應該有所瞭解。若是還有不清楚的地方,能夠經過debug的方式進行跟蹤分析。其實在不少的開源框架中,都有自定義的線程池,但其底層最終使用的仍是 ThreadPoolExecutor 線程池,這個知識點建議你們必定要掌握,不管是實際工做仍是面試,都是一個經常使用的知識點。


 

相關閱讀:

你所不知道的 BigDecimal

ThreadPoolExecutor 原理解析

Java線程池ThreadPoolExecutor

使用 Mybatis 真心不要偷懶!

相關文章
相關標籤/搜索