Dubbo學習筆記8:Dubbo的線程模型與線程池策略

Dubbo默認的底層網絡通信使用的是Netty,服務提供方NettyServer使用兩級線程池,其中 EventLoopGroup(boss) 主要用來接受客戶端的連接請求,並把接受的請求分發給 EventLoopGroup(worker) 來處理,boss和worker線程組咱們稱之爲IO線程。數據庫

若是服務提供方的邏輯能迅速完成,而且不會發起新的IO請求,那麼直接在IO線程上處理會更快,由於這減小了線程池調度。緩存

但若是處理邏輯很慢,或者須要發起新的IO請求,好比須要查詢數據庫,則IO線程必須派發請求到新的線程池進行處理,不然IO線程會阻塞,將致使不能接收其它請求。網絡

Dubbo提供的線程模型app

根據請求的消息類被IO線程處理仍是被業務線程池處理,Dubbo提供了下面幾種線程模型:oop

  •  all : (AllDispatcher類)全部消息都派發到業務線程池,這些消息包括請求/響應/鏈接事件/斷開事件/心跳等,這些線程模型以下圖:

  • direct : (DirectDispacher類)全部消息都不派發到業務線程池,所有在IO線程上直接執行,模型以下圖:

  • message : (MessageOnlyDispatcher類)只有請求響應消息派發到業務線程池,其餘鏈接斷開事件/心跳等消息,直接在IO線程上執行,模型圖以下:

  • execution:(ExecutionDispatcher類)只把請求類消息派發到業務線程池處理,可是響應和其它鏈接斷開事件,心跳等消息直接在IO線程上執行,模型以下圖:

  • connection:(ConnectionOrderedDispatcher類)在IO線程上,將鏈接斷開事件放入隊列,有序逐個執行,其它消息派發到業務線程池處理,模型以下圖:

其中AllDispatcher對應的handler代碼以下:性能

public class AllChannelHandler extends WrappedChannelHandler{
    public AllChannelHandler(ChannelHandler handler , URL url){
        super(handler,url);
    }

    // 連接事件,交給業務線程池處理
    public void connected(Channel channel) throws RemotingExcecption{
        ExecutorService cexecutor = getExecutorService();
        try{
            cexecutor.execute(new ChannelEventRunnable(channel,handler,ChannelState.CONNECTED));
        }catch(Throwable t){
            throw new ExecutionException("connect event" , channel , getClass() + "error when process connected event.",t);
        }
    }

    // 連接斷開事件,交給業務線程池處理
    public void disconnected(Channel channel) throws RemotingException{
        ExecutorService cexecutor = getExecutorService();
        try{
            cexecutor.execute(new ChannelEventRunnable(channel,handler,ChannelState.DISCONNECTED));
        }catch(Throwable t){
            throw new ExecutionException("disconnect event",channel,getClass()+" error when process disconnected event.",t);
        }
    }

    // 請求響應事件,交給業務線程池處理
    public void received(Channel channel , Object message) throws RemotingException{        
        ExecutorService cexecutor = getExecutorService();
        try{
            cexecutor.execute(new ChannelEventRunnable(channel,handler,ChannelState.RECEIVED,message));
        }catch(Throwable t){
            // TODO 臨時解決線程池滿後異常信息沒法發送到對端的問題。待重構
            // fix 線程池滿了拒絕調用不返回,致使消費者一直等待超時
            if(message instanceof Request && t instanceof RejectedExecutionException ){
                ...
            }
            throw new ExecutionException(message , channel ,getClass() + " error when process received event.",t);
        }
    }

    // 異常處理事件,交給業務線程池處理
    public void caught(Channel channel , Throwable exception) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try{
            cexecutor.execute(new ChannelEventRunnable(channel,handler,ChannelState.CAUGHT,exception));
        }catch(Throwable t){
            throw new ExecutionException("caught event",channel,getClass() + " error when process caught event .");
        }
    }
    ...
}

可知全部事件都直接交給業務線程池進行處理了。url

Dubbo提供了經常使用的線程池模型,這些模型能夠知足咱們絕大多數的需求,可是您能夠根據本身的須要進行擴展定製。在服務提供者啓動線程時,咱們會看到何時加載的線程模型的實現。spa

Dubbo提供的線程池策略線程

擴展接口 ThreadPool 的SPI實現有以下幾種:3d

  • fixed:固定大小線程池,啓動時創建線程,不關閉,一直持有(缺省)。
  • cached:緩存線程池,空閒一分鐘自動刪除,須要時重建。
  • limited:可伸縮線程池,但池中的線程數只會增加不會收縮。只增加不收縮的目的是爲了不收縮時忽然帶來大流量引發性能問題。

其中fixed策略對應擴展實現類是FixedThreadPool,代碼以下:

public class FixedThreadPool implements ThreadPool{
    public Executor getExecutor(URL url){
        String name = url.getParameter(Constants.THREAD_NAME_KEY,Constants.DEFAULT_THREAD_NAME);
        int threads = url.getParameter(Constants.THREADS_KEY,Constants.DEFAULT_THREADS);
        int queues = url.getParameter(Constants.QUEUES_KEY,Constants.DEFAULT_QUEUES);
        return new ThreadPoolExecutor(threads , threads , 0 , TimeUnit.MILLISECONDS , queues==0 ? new SynchronousQueue<Runnable>() : (queue < 0 ? new LinkedBlockingQueue<Runnable>(queues)) , new NamedThreadFactory(name,true) , new AbortPolicyWithReport(name,url));
    }
}

可知使用ThreadPoolExecutor建立了核心線程數=最大線程池數=threads的線程池。

Dubbo線程池擴展,這些擴展能夠知足咱們絕大多數的需求,可是您能夠根據本身的須要進行擴展定製。在服務提供者啓動流程時,咱們會看到何時加載的線程池擴展實現。  

相關文章
相關標籤/搜索