Dubbo 線程池模型

前言

你們好,今天開始給你們分享 — Dubbo 專題之 Dubbo 線程池模型。在前面上個章節中咱們討論了 Dubbo SPI,瞭解了 Dubbo SPI 其本質是從 JDK 標準的 SPI (Service Provider Interface) 擴展點發現機制增強而來,同時解決了 Java 中 SPI 的一些缺陷。以及咱們使用 Dubbo SPI 實現自定義能力的拓展。那本章節咱們要討論的 Dubbo 線程模型也是基於 SPI 實現,那什麼是線程模型呢?以及其在咱們的項目中有什麼做用呢?那麼咱們在本章節中進行討論。下面就讓咱們快速開始吧!java

1. 線程模型簡介

小夥伴若是對 Servlet 熟悉就知道,從 Servlet 3.x 開始支持異步非阻塞模式。至於什麼異步非阻塞前面我在前面的章節中有討論小夥伴能夠自行學習以前的文章。咱們經過一個訪問Web應用流程圖簡單說明:git

線程模型1

在上面的流程圖中咱們能夠看到第一個請求發起同步 Web 調用,而後 Web 再發起對第三方服務的調用,整個過程全鏈路是同步調用。第二個請求一樣也是發起同步調用,可是在發起第三方調用的時候切換了線程(基於 Servlet 3.x 咱們不須要手動的建立線程來切換)。這麼作的好處在於咱們能夠用專門處理線程池去作業務處理或第三方服務的調用。那什麼狀況下咱們須要切換線程不使用主線程呢?若是事件處理的邏輯能迅速完成,而且不會發起新的 IO 請求,好比只是在內存中記個標識,則直接在 IO 線程上處理更快,由於減小了線程池調度。但若是事件處理邏輯較慢,或者須要發起新的 IO 請求,好比須要查詢數據庫或其它服務調用時,則必須派發到線程池,不然 IO 線程阻塞,將致使不能接收其它請求。spring

2. 使用方式

那在 Dubbo 中給咱們提供了經過不一樣的派發策略和不一樣的線程池配置的組合來應對不一樣的場景。配置方式以下:數據庫

<dubbo:protocol name="dubbo" dispatcher="all" threadpool="fixed" threads="100" />

下面咱們簡單描述下dispatcherthreadpool的參數說明:apache

  1. Dispatcher
  • all 全部消息都派發到線程池,包括請求,響應,鏈接事件,斷開事件,心跳等。(默認)
  • direct 全部消息都不派發到線程池,所有在 IO 線程上直接執行。
  • message 只有請求響應消息派發到線程池,其它鏈接斷開事件,心跳等消息,直接在 IO 線程上執行。
  • execution 只有請求消息派發到線程池,不含響應,響應和其它鏈接斷開事件,心跳等消息,直接在 IO 線程上執行。
  • connection 在 IO 線程上,將鏈接斷開事件放入隊列,有序逐個執行,其它消息派發到線程池。
  1. ThreadPool
  • fixed 固定大小線程池,啓動時創建線程,不關閉,一直持有。(默認)
  • cached 緩存線程池,空閒一分鐘自動刪除,須要時重建。
  • limited 可伸縮線程池,但池中的線程數只會增加不會收縮。只增加不收縮的目的是爲了不收縮時忽然來了大流量引發的性能問題。
  • eager 優先建立Worker線程池。在任務數量大於corePoolSize可是小於maximumPoolSize時,優先建立Worker來處理任務。當任務數量大於maximumPoolSize時,將任務放入阻塞隊列中。阻塞隊列充滿時拋出RejectedExecutionException。(相比於cached:cached在任務數量超過maximumPoolSize時直接拋出異常而不是將任務放入阻塞隊列)

3. 使用場景

經過前面的介紹咱們應該明白咱們爲何須要切換線程,遵循一個很簡單的原則:若是咱們處理的任務須要操做新的 IO 或者處理任務須要很長的時間那麼咱們就能夠把這部分工做放到咱們的任務線程池去處理。那麼咱們簡單的總結下在工做常遇到的場景:編程

  1. 計算型服務:在我以前的工做中遇到這樣的一個需求:咱們的車機實時上報數據給服務器,服務器記錄數據而且實時計算和糾正導航數據。那麼這裏咱們須要一個計算型的微服務,主要的工做就是計算和修正實時數據,那麼這個服務就是典型的計算型服務,全部咱們計算過程當中儘可能減小線程的切換並儘量的在一個線程內進行計算。這樣減小線程切換的開銷提供計算速度。
  2. 網關服務:首先咱們須要瞭解什麼是網關,簡單的理解就是全部的服務入口,對每一個服務的調用必須通過網關轉發到對應服務上(相似 Nginx )。那這裏網關主要工做就是服務轉發(鑑權、限流等等),能夠理解爲發起請求。很明顯發起請求就是開啓新的 IO 全部咱們能夠切換到線程池去處理。

4. 示例演示

下面咱們經過以獲取圖書列表爲例進行演示。如下是項目的結構圖:api

idea1

由於這裏咱們主要是對服務提供端的配置,全部咱們主要看dubbo-provider-xml.xml配置內容:緩存

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
       xmlns="http://www.springframework.org/schema/beans"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
       http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">

    <!-- 指定分發策略爲:all 線程池:fixed 固定大小爲:100 -->
    <dubbo:protocol port="20880" name="dubbo" dispatcher="all" threadpool="fixed" threads="100" />

    <dubbo:application name="demo-provider" metadata-type="remote"/>

    <dubbo:registry address="zookeeper://127.0.0.1:2181"/>

    <bean id="bookFacade" class="com.muke.dubbocourse.test.provider.BookFacadeImpl"/>

    <!--暴露服務爲Dubbo服務-->
    <dubbo:service interface="com.muke.dubbocourse.common.api.BookFacade" ref="bookFacade" />

</beans>

上面的 XML 配置中dispatcher="all"指定事件的分發策略、threadpool="fixed" threads="100"指定線程池固定大小爲100服務器

5. 原理分析

這裏分發策略和線程池採用 Dubbo 中的 SPI 方式加載的小夥伴能夠參考前面的 《Dubbo SPI》章節進行了解。下面咱們進入主題,首先看看在 Dubbo 中爲咱們提供的5種事件分發策略:微信

idea2

咱們這裏簡單的分析 all分發策略其它的都是相似的小夥伴自行查閱源碼分析。下面咱們看看org.apache.dubbo.remoting.transport.dispatcher.all.AllChannelHandler核心源碼:

/***
 *@className AllChannelHandler
 *       
 *@description 全部處理分發到線程池去處理
 *       
 *@author <a href="http://youngitman.tech">青年IT男</a>
 *       
 *@date 12:50 2020-03-05
 *       
 *@JunitTest: {@link  }     
 *
 *@version v1.0.0
 *       
**/
public class AllChannelHandler extends WrappedChannelHandler {

    public AllChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }

    /**
     *
     * 遠程鏈接事件回調
     *
     * @author liyong 
     * @date 1:34 PM 2020/12/6 
     * @param channel 
     * @exception 
     * @return void 
     **/
    @Override
    public void connected(Channel channel) throws RemotingException {
        ExecutorService executor = getExecutorService();
        try {
            //鏈接到遠程事件放入線程池執行
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
        }
    }

    /**
     *
     * 端口遠程鏈接
     *
     * @author liyong 
     * @date 1:34 PM 2020/12/6 
     * @param channel 
     * @exception 
     * @return void 
     **/
    @Override
    public void disconnected(Channel channel) throws RemotingException {
        ExecutorService executor = getExecutorService();
        try {
            //斷開鏈接處理事件放入線程池執行
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t);
        }
    }

    /**
     *
     * 接收到數據回調
     *
     * @author liyong 
     * @date 1:34 PM 2020/12/6 
     * @param channel 
     * @param message 
     * @exception 
     * @return void 
     **/
    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService executor = getPreferredExecutorService(message);
        try {
            //接收到數據放入線程池處理
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            if(message instanceof Request && t instanceof RejectedExecutionException){
                sendFeedback(channel, (Request) message, t);
                return;
            }
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

    /**
     *
     * 發生異常回調
     *
     * @author liyong 
     * @date 1:35 PM 2020/12/6 
     * @param channel 
     * @param exception 
     * @exception 
     * @return void 
     **/
    @Override
    public void caught(Channel channel, Throwable exception) throws RemotingException {
        ExecutorService executor = getExecutorService();
        try {
            //發生異常放入線程池處理
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
        } catch (Throwable t) {
            throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
        }
    }
}

從上面的代碼註釋中能夠看到 all 這種處理策略就是全部消息都派發到線程池,包括請求、響應、鏈接事件、斷開事件、心跳等。

接下來咱們看看線程池的處理策略主要支持4種:

idea3

咱們以fixed策略進行分析。咱們看到org.apache.dubbo.common.threadpool.support.fixed.FixedThreadPool核心源碼:

/**
 * 建立固定大小線程池
 *
 * @see java.util.concurrent.Executors#newFixedThreadPool(int)
 */
public class FixedThreadPool implements ThreadPool {

    @Override
    public Executor getExecutor(URL url) {
        //線程池名稱
        String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
        //線程池大小
        int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
        //隊列大小
        int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
        return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
                //若是隊列大小爲0使用同步隊列
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        //不然使用指定大小到阻塞隊列
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }

}

上面的源碼中使用指定大小的隊列建立線程池,若是隊列大小爲0使用同步隊列。

6. 小結

在本小節中咱們主要學習了 Dubbo 中的線程池模型,在 Dubbo 中爲咱們提供了兩種策略調整線程池模型分別是:DispatcherThreadPool。其中Dispatcher提供了5種策略:alldirectmessageexecutionconnectionThreadPool提供了4種策略:fixedcachedlimitedeager。同時咱們分別從源碼中學習了底層的實現邏輯。

本節課程的重點以下:

  1. 理解 Dubbo 中線程模型
  2. 瞭解什麼是 Dispatcher模式
  3. 瞭解什麼是 ThreadPool模式
  4. 瞭解線程模型實現原理

寫在最後

本小節是 Dubbo 入門到精通系列 (《從零開始學習Dubbo》、《Dubbo高階應用》、《Dubbo源碼分析》) 中 《從零開始學習Dubbo》基礎課程最後一小節,感謝你們長期的支持。因爲本人時間精力有限後面課程的相關專題更新可能比較緩慢請多多包含,再次感謝小夥伴的關注。若是想得到最新的專題分享請關注個人微信公衆號。

做者

我的從事金融行業,就任過易極付、思建科技、某網約車平臺等重慶一流技術團隊,目前就任於某銀行負責統一支付系統建設。自身對金融行業有強烈的愛好。同時也實踐大數據、數據存儲、自動化集成和部署、分佈式微服務、響應式編程、人工智能等領域。同時也熱衷於技術分享創立公衆號和博客站點對知識體系進行分享。關注公衆號: 青年IT男 獲取最新技術文章推送!

博客地址: http://youngitman.tech

微信公衆號:

相關文章
相關標籤/搜索