歡迎訪問個人博客,同步更新: 楓山別院html
今天咱們要介紹的是Dubbo消息派發的時候,使用的線程模型,Dubbo版本2.8.4。那麼什麼是Dubbo的消息派發呢?好比,Dubbo提供者收到消費者的請求以後,要將這個請求派發給哪一個線程來處理。這個派發給某個線程的規則,就是咱們今天要討論的消息派發的線程模型。apache
請求流程
你們請看下圖,下圖來自Dubbo官方文檔,線程模型:api
image.png網絡
咱們解釋一下:框架
-
消費者的代理類發起請求ide
-
請求通過編碼和序列化,而後經由Netty(或者Mina等)發送給提供者源碼分析
-
提供者收到了請求,而後解碼和反序列化等編碼
-
根據Dubbo配置,選擇相應的線程,處理業務邏輯url
這裏的第四步,就是咱們今天要討論的內容,Dubbo有哪些處理業務的線程模型以及如何選擇。spa
消息分類
首先,咱們討論下Dubbo有哪些消息,大致上能夠分爲兩類:非業務消息、業務消息。
-
非業務消息包括:創建鏈接,斷開鏈接,心跳
-
業務消息包括:請求,響應,異常
這些消息咱們能夠在com.alibaba.dubbo.remoting.ChannelHandler中找到,不包括心跳。
public interface ChannelHandler { void connected(Channel channel) throws RemotingException; void disconnected(Channel channel) throws RemotingException; void sent(Channel channel, Object message) throws RemotingException; void received(Channel channel, Object message) throws RemotingException; void caught(Channel channel, Throwable exception) throws RemotingException; }
線程池
針對兩種分類的消息,咱們的線程也能夠分爲兩類:Netty的線程池、業務線程池
-
Netty(Mina等)的線程池:就是底層網絡框架用於處理網絡傳輸的線程池,網絡消息編碼解碼,發送網絡數據等
-
業務線程池:這個線程池是Dubbo框架建立的,用於處理除了網絡傳輸以外,Dubbo相關的業務。
消息咱們都理清楚了,線程池也都知道了,那麼,哪些消息應該派發到哪一個線程池,這個規則,就是今天的主角。
消息處理線程模型
Dubbo有五種處理的線程模型,
-
all
:全部的消息都派發到業務線程池,包括請求,響應,鏈接事件,斷開事件,心跳等。 -
direct
:全部消息都不派發到業務線程池,所有在Netty(或者Mina等)線程上直接執行。 -
message
:只有請求響應等業務消息派發到業務線程池,其它鏈接斷開事件,心跳等消息,直接在Netty(或者Mina等)線程上執行。 -
execution
:只有請求消息派發到Netty線程池,不含響應,響應和其它鏈接斷開事件,心跳等消息,直接在業務線程上執行。 -
connection
: 將鏈接斷開事件放入隊列,由一個只有1個線程的其餘線程池處理,有序逐個執行,其它消息派發到業務線程池。
源碼分析
派發接口是一個SPI拓展接口,com.alibaba.dubbo.remoting.Dispatcher,這個拓展接口中默認的線程模型是all
。
@SPI(AllDispatcher.NAME) public interface Dispatcher { /** * dispatch the message to threadpool. * * @param handler * @param url * @return channel handler */ @Adaptive({Constants.DISPATCHER_KEY, "dispather", "channel.handler"}) // 後兩個參數爲兼容舊配置 ChannelHandler dispatch(ChannelHandler handler, URL url); }
若是某個Dubbo服務接口單獨配置了線程模型,那麼會從URL中,按照dispatcher
,dispather
,channel.handler
的key值,找對應的線程模型來處理。後面的兩個參數是爲了兼容老版本Dubbo,dispather
就徹底是在老版Dubbo中,這個單詞拼錯了,後來在新版本中改過來了,就必需要兼容這個錯誤的配置。
OK,這個接口有5種實現,就是對應上面的5中線程模型:
image.png
這5種Dispatcher分別有對應的5種ChannelHandler,邏輯都在handler中,以下圖
image.png
以AllChannelHandler爲例,裏面就是對應的connected,disconnected,received,caught的實現,很是的簡單,就是把任務提交到線程池裏。
image.png
每一個任務都是ChannelEventRunnable類,實現了Runnable接口,線程會執行它的run方法。在run方法中,是一個switch,根據消息類型,調用handler對應的方法。咱們以received爲例:
@Override public void received(Channel channel, Object message) throws RemotingException { if (message instanceof Invocation) { reply((ExchangeChannel) channel, message); } else { super.received(channel, message); } }
received調用了reply方法,咱們繼續看看reply方法:
public Object reply(ExchangeChannel channel, Object message) throws RemotingException { if (message instanceof Invocation) { Invocation inv = (Invocation) message; Invoker<?> invoker = getInvoker(channel, inv); //若是是callback 須要處理高版本調用低版本的問題 if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))){ String methodsStr = invoker.getUrl().getParameters().get("methods"); boolean hasMethod = false; if (methodsStr == null || methodsStr.indexOf(",") == -1){ hasMethod = inv.getMethodName().equals(methodsStr); } else { String[] methods = methodsStr.split(","); for (String method : methods){ if (inv.getMethodName().equals(method)){ hasMethod = true; break; } } } if (!hasMethod){ logger.warn(new IllegalStateException("The methodName "+inv.getMethodName()+" not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) +" ,invocation is :"+inv ); return null; } } RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); return invoker.invoke(inv); } throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress()); }
reply中就是查找invoker,調用業務邏輯了,後面就不是本篇的重點了。