源碼分析Dubbo網絡通信篇之NettyServer網絡事件之線程池

本文主要分析Dubbo線程池的構建過程,主要介紹官方文檔中有關於ThreadPool的種類:java

  • fixed 固定大小線程池,啓動時創建線程,不關閉,一致持有。(缺省)
  • cached :緩存線程池,空閒一分鐘,線程會消費,須要時從新建立新線程。
  • limited :可伸縮線程池,但池中的線程數只會增加不會收縮。
  • eager :優先使用線程來執行新提交任務。(渴望當即執行,而不是進入隊列排隊執行)。配置標籤:< dubbo:protocol threadpool = "fixed" ../>

各類類型的線程池,內部就是根據規則建立不一樣的ThreadPoolExecutor對象,那咱們先簡單回顧一下線程池的基本知識,其構造方法以下所示:緩存

public ThreadPoolExecutor(
         int corePoolSize, // 線程池核心線程數、常駐線程數。
         int maximumPoolSize,  // 線程池中最大線程數量
        long keepAliveTime, // 線程保持活躍時間,(若是線程建立,並空閒
                 //指定值後,線程會被回收,0表示不開啓該特性,其範圍針對   // corePoolSize的線程)
        TimeUnit unit,  // keepAliveTime的時間單位。
        BlockingQueue&lt; Runnable&gt; workQueue,// 任務隊列
        ThreadFactory threadFactory, // 線程工廠類,通常經過該線程工廠,爲線程命名,以便區分線程。
        RejectedExecutionHandler handler) // 拒絕策略。

提交任務流程(線程建立流程)架構

  1. 若是線程池中線程數量小於corePoolSize,則建立一個線程來執行該任務。
  2. 若是線程池中的線程大於等於corePoolSize,則嘗試將任務放入隊列中。
  3. 若是成功將任務放入隊列,則本次提交任務正常結束,若是放入任務隊列失敗則繼續下一步。
  4. 若是線程池中的線程數量小於最大線程數,則建立先的線程,不然執行拒絕策略。 更多有關線程池,請參考做者的另一篇博文:源碼分析JDK線程池

一、fixed 固定大小線程池

public class FixedThreadPool implements ThreadPool {
    @Override
    public Executor getExecutor(URL url) {
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<runnable>() :
                        (queues &lt; 0 ? new LinkedBlockingQueue<runnable>()
                                : new LinkedBlockingQueue<runnable>(queues)),
                new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }
}

實現要點:併發

  1. 首先獲取可配置參數threadname、threads、queues三個參數,分別表明線程池中線程名前綴、線程中最大線程數量、任務隊列長度。
  2. 要實現fixed固定大小線程池,故名思議,就是線程池自建立以來,線程數量始終保持一致。其實現要點是,corePoolSize、maximumPoolSize相等,而且其值等於threads(默認200),而且keepAliveTime=0,表示線程始終活躍。
  3. 任務隊列,若是queues 爲0,則使用SynchronousQueue,若是小於0,則使用無界隊列,若是大於0,則建立容量爲LinkedBlockingQueue的隊列,超過容量,則拒絕入隊。
  4. 線程工廠,NamedThreadFactory,主要設置線程名稱,默認爲Dubbo-thread-序號。
  5. 拒絕策略AbortPolicyWithReport,其主要是若是拒絕任務,首先會打印出詳細日誌,包含線程池的核心參數,而且會dump jstack 日誌,日誌文件默認存儲在 user.home/Dubbo_JStack.log.timestamp,能夠經過 dump.directory 屬性配置,可經過< dubbo:protocol> < dubbo:parameter key =「」 value = ""/> < /dubbo:protocol>。 >這裏再簡單介紹若是隊列長度爲0(默認),爲何是選用SynchronousQueue隊列。 SynchronousQueue的一個簡單理解:調用offer、put以前,必須先調用take,也就是先調用take方法的線程阻塞,而後當別的線程調用offer以後,調用take的線程被喚醒,若是沒有線程調用take方法,一個線程調用offer方法,則會返回false,並不會將元素添加到SynchronousQueue隊列中,由於SynchronousQueue內部的隊列長度爲0。 與該線程池相關的配置屬性:threadname、theadpool、threads、queues。

二、cached 緩存線程池,線程空閒後會被回收

public class CachedThreadPool implements ThreadPool {
    @Override
    public Executor getExecutor(URL url) {
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
        int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
        return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<runnable>() :
                        (queues &lt; 0 ? new LinkedBlockingQueue<runnable>()
                                : new LinkedBlockingQueue<runnable>(queues)),
                new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }
}

實現要點:既然要實現線程能夠被回收,則必然要設置 keepAliveTime。 故對應線程池核心參數設置,對應以下:分佈式

  • corePoolSize:經過參數corethreads設置,默認爲0
  • maximumPoolSize:經過參數threads設置,默認200
  • keepAliveTime:經過參數alive設置,默認爲60 * 1000
  • workQueue :經過queues參數設置,默認爲0
  • 其餘與fixed相同,則不重複介紹

三、limited 可伸縮線程池,其特徵:線程數只增不減

public class LimitedThreadPool implements ThreadPool {
    @Override
    public Executor getExecutor(URL url) {
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
        int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<runnable>() :
                        (queues &lt; 0 ? new LinkedBlockingQueue<runnable>()
                                : new LinkedBlockingQueue<runnable>(queues)),
                new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }
}

就不回收,與cached不一樣的就是 keepAliveTime 的取值不一樣,limited 取值爲:Long.MAX_VALUE,其餘與 cached 相同。ide

四、eager

其核心實現主要由 TaskQueue、EagerThreadPoolExecutor 共同完成。 首先,咱們關注一下 TaskQueued 的 offer方法。高併發

public boolean offer(Runnable runnable) {
        if (executor == null) {
            throw new RejectedExecutionException("The task queue does not have executor!");
        }

        int currentPoolThreadSize = executor.getPoolSize();     // [@1](https://my.oschina.net/u/1198)
        // have free worker. put task into queue to let the worker deal with task.
        if (executor.getSubmittedTaskCount() &lt; currentPoolThreadSize) {   // @2
            return super.offer(runnable);
        }

        // return false to let executor create new worker.
        if (currentPoolThreadSize &lt; executor.getMaximumPoolSize()) {    // [@3](https://my.oschina.net/u/2648711)
            return false;
        }

        // currentPoolThreadSize &gt;= max     // @4
        return super.offer(runnable); 
    }

代碼@1:獲取當前線程池中線程的數量。源碼分析

代碼@2:若是當前已提交到線程池中的任務數量小於當前存在在的線程數,則走默認的提交流程。url

代碼@3:若是當前已提交到線程中的數量大於當前的線程池,併線程池中數量並未達到線程池容許建立的最大線程數時,則返回false,並不入隊,其效果是會建立新的線程來執行。.net

代碼@4:若是當前線程池中的線程已達到容許建立的最大線程數後,走默認的提交任務邏輯。 EagerThreadPoolExecutor#execute

public void execute(Runnable command) {
        if (command == null) {
            throw new NullPointerException();
        }
        // do not increment in method beforeExecute!
        submittedTaskCount.incrementAndGet();       // @1 
        try {
            super.execute(command);
        } catch (RejectedExecutionException rx) {
            // retry to offer the task into queue.
            final TaskQueue queue = (TaskQueue) super.getQueue();
            try {
                if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
                    submittedTaskCount.decrementAndGet();
                    throw new RejectedExecutionException("Queue capacity is full.");
                }
            } catch (InterruptedException x) {
                submittedTaskCount.decrementAndGet();
                throw new RejectedExecutionException(x);
            }
        } catch (Throwable t) {
            // decrease any way
            submittedTaskCount.decrementAndGet();   // @2
        }
    }

其核心實現邏輯:若是提交任務失敗,則再走一次默認的任務提交流程。 最總後結一下Eager的核心特性。

public class EagerThreadPool implements ThreadPool {
    @Override
    public Executor getExecutor(URL url) {
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
        int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);

        // init queue and executor
        TaskQueue<runnable> taskQueue = new TaskQueue<runnable>(queues &lt;= 0 ? 1 : queues);
        EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores,
                threads,
                alive,
                TimeUnit.MILLISECONDS,
                taskQueue,
                new NamedThreadFactory(name, true),
                new AbortPolicyWithReport(name, url));
        taskQueue.setExecutor(executor);
        return executor;
    }
}

其核心特性以下:

  1. 首先,其配置參數與cached類型的線程池相同,說明eager也是基於緩存的。
  2. eager與cached類型線程池不一樣的一點是,提交任務後,線程優先於隊列,默認的提交流程是若是線程數達到核心線程數後,新提交的任務是首先進入隊列,但eager是優先建立線程來執行,這有點與公平鎖,非公平鎖同樣的概念了。

>做者介紹:丁威,《RocketMQ技術內幕》做者,RocketMQ 社區優秀佈道師、CSDN2019博客之星TOP10,維護公衆號:中間件興趣圈目前已陸續發表源碼分析Java集合、Java 併發包(JUC)、Netty、Mycat、Dubbo、RocketMQ、Mybatis等源碼專欄。能夠點擊連接加入中間件知識星球 ,一塊兒探討高併發、分佈式服務架構,交流源碼。 在這裏插入圖片描述</runnable></runnable></runnable></runnable></runnable></runnable></runnable></runnable></runnable></runnable></runnable>

相關文章
相關標籤/搜索