Dubbo 消息派發的線程模型

歡迎訪問個人博客,同步更新: 楓山別院html

今天咱們要介紹的是Dubbo消息派發的時候,使用的線程模型,Dubbo版本2.8.4。那麼什麼是Dubbo的消息派發呢?好比,Dubbo提供者收到消費者的請求以後,要將這個請求派發給哪一個線程來處理。這個派發給某個線程的規則,就是咱們今天要討論的消息派發的線程模型。apache

請求流程

你們請看下圖,下圖來自Dubbo官方文檔,線程模型api

image.png網絡

咱們解釋一下:框架

  1. 消費者的代理類發起請求ide

  2. 請求通過編碼和序列化,而後經由Netty(或者Mina等)發送給提供者源碼分析

  3. 提供者收到了請求,而後解碼和反序列化等編碼

  4. 根據Dubbo配置,選擇相應的線程,處理業務邏輯url

這裏的第四步,就是咱們今天要討論的內容,Dubbo有哪些處理業務的線程模型以及如何選擇。spa

消息分類

首先,咱們討論下Dubbo有哪些消息,大致上能夠分爲兩類:非業務消息、業務消息。

  • 非業務消息包括:創建鏈接,斷開鏈接,心跳

  • 業務消息包括:請求,響應,異常

這些消息咱們能夠在com.alibaba.dubbo.remoting.ChannelHandler中找到,不包括心跳。

public interface ChannelHandler {
        void connected(Channel channel) throws RemotingException;
        void disconnected(Channel channel) throws RemotingException;
        void sent(Channel channel, Object message) throws RemotingException;
        void received(Channel channel, Object message) throws RemotingException;
        void caught(Channel channel, Throwable exception) throws RemotingException;
}

線程池

針對兩種分類的消息,咱們的線程也能夠分爲兩類:Netty的線程池、業務線程池

  • Netty(Mina等)的線程池:就是底層網絡框架用於處理網絡傳輸的線程池,網絡消息編碼解碼,發送網絡數據等

  • 業務線程池:這個線程池是Dubbo框架建立的,用於處理除了網絡傳輸以外,Dubbo相關的業務。

消息咱們都理清楚了,線程池也都知道了,那麼,哪些消息應該派發到哪一個線程池,這個規則,就是今天的主角。

消息處理線程模型

Dubbo有五種處理的線程模型,

  1. all :全部的消息都派發到業務線程池,包括請求,響應,鏈接事件,斷開事件,心跳等。

  2. direct:全部消息都不派發到業務線程池,所有在Netty(或者Mina等)線程上直接執行。

  3. message:只有請求響應等業務消息派發到業務線程池,其它鏈接斷開事件,心跳等消息,直接在Netty(或者Mina等)線程上執行。

  4. execution:只有請求消息派發到Netty線程池,不含響應,響應和其它鏈接斷開事件,心跳等消息,直接在業務線程上執行。

  5. connection: 將鏈接斷開事件放入隊列,由一個只有1個線程的其餘線程池處理,有序逐個執行,其它消息派發到業務線程池。

源碼分析

派發接口是一個SPI拓展接口,com.alibaba.dubbo.remoting.Dispatcher,這個拓展接口中默認的線程模型是all

@SPI(AllDispatcher.NAME)
public interface Dispatcher {

    /**
     * dispatch the message to threadpool.
     * 
     * @param handler
     * @param url
     * @return channel handler
     */
    @Adaptive({Constants.DISPATCHER_KEY, "dispather", "channel.handler"}) // 後兩個參數爲兼容舊配置
    ChannelHandler dispatch(ChannelHandler handler, URL url);

}

若是某個Dubbo服務接口單獨配置了線程模型,那麼會從URL中,按照dispatcherdispatherchannel.handler的key值,找對應的線程模型來處理。後面的兩個參數是爲了兼容老版本Dubbo,dispather就徹底是在老版Dubbo中,這個單詞拼錯了,後來在新版本中改過來了,就必需要兼容這個錯誤的配置。

OK,這個接口有5種實現,就是對應上面的5中線程模型:

image.png

這5種Dispatcher分別有對應的5種ChannelHandler,邏輯都在handler中,以下圖

image.png

以AllChannelHandler爲例,裏面就是對應的connected,disconnected,received,caught的實現,很是的簡單,就是把任務提交到線程池裏。

image.png

每一個任務都是ChannelEventRunnable類,實現了Runnable接口,線程會執行它的run方法。在run方法中,是一個switch,根據消息類型,調用handler對應的方法。咱們以received爲例:

@Override
        public void received(Channel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                reply((ExchangeChannel) channel, message);
            } else {
                super.received(channel, message);
            }
        }

received調用了reply方法,咱們繼續看看reply方法:

public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                Invocation inv = (Invocation) message;
                Invoker<?> invoker = getInvoker(channel, inv);
                //若是是callback 須要處理高版本調用低版本的問題
                if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))){
                    String methodsStr = invoker.getUrl().getParameters().get("methods");
                    boolean hasMethod = false;
                    if (methodsStr == null || methodsStr.indexOf(",") == -1){
                        hasMethod = inv.getMethodName().equals(methodsStr);
                    } else {
                        String[] methods = methodsStr.split(",");
                        for (String method : methods){
                            if (inv.getMethodName().equals(method)){
                                hasMethod = true;
                                break;
                            }
                        }
                    }
                    if (!hasMethod){
                        logger.warn(new IllegalStateException("The methodName "+inv.getMethodName()+" not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) +" ,invocation is :"+inv );
                        return null;
                    }
                }
                RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                return invoker.invoke(inv);
            }
            throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
        }

reply中就是查找invoker,調用業務邏輯了,後面就不是本篇的重點了。

相關文章
相關標籤/搜索