Netty,讓咱們的編寫TCP變的很是簡單,而且它在業界運用極其普遍。Dubbo底層默認實現也是經過Netty。
org.apache.dubbo.remoting.transport.netty4.NettyServer就是其默認實現方式。它的類關係以下:
html
從上面的類圖,咱們能夠知道其設計的理念。apache
1)Map<String, Channel> channels 持有與該服務端通訊的TCP通訊Channel,key爲host:port。它是NettyServerHandler內部channels的引用,NettyServerHandler就是Netty處理消息的句柄,由咱們本身編寫。
2)io.netty.channel.Channel channel netty監聽網絡鏈接的channel,由bind()方法返回
3)內部監聽線程爲1個,線程名爲NettyServerBoss。
4)內部IO線程爲可用處理器+1,最多爲32個線程,線程名爲NettyServerWorker。
5)NettyServerHandler 爲dubbo的消息處理器,編解碼器對消息編解碼後,就是扔給它處理。它內部持有一個ChannelHandler,便是NettyServer這個類,從類圖上可知NettyServer實現了ChannelHandler 這個接口。
六、NettyServer 持有一個外部傳進來的ChannelHandler。並會對其進行封裝,截圖以下(截圖比較清晰):
6)NettyCodecAdapter,爲dubbo內部的編解碼器,內部持有編碼器InternalEncoder,解碼器爲InternalDecoder。
7)Dubbo 的業務線程名爲DubboServerHandler。以後再日誌上看到這個名字不要在陌生了。api
答案,確定不是,直接在框架上new,以後確定沒有擴展性可言。沒事情,咱們包裹一層便可,命名爲傳輸層Transporter,它就有bind,connect功能。
NettyTransporter就是其具體的實現之一。服務器
public class NettyTransporter implements Transporter { public static final String NAME = "netty"; @Override public RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException { return new NettyServer(url, handler); } @Override public Client connect(URL url, ChannelHandler handler) throws RemotingException { return new NettyClient(url, handler); } }
既然是一個SPI擴展,必然須要經過ExtensionLoader.getExtensionLoader()進行加載,Transporters其工具類的靜態方法getTransporter()就是經過ExtensionLoader進行加載的,而後其靜態方法Transporters.bind()來獲取RemotingServer,即NettyServer。網絡
public static RemotingServer bind(URL url, ChannelHandler... handlers) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handlers == null || handlers.length == 0) { throw new IllegalArgumentException("handlers == null"); } ChannelHandler handler; if (handlers.length == 1) { handler = handlers[0]; } else { handler = new ChannelHandlerDispatcher(handlers); } return getTransporter().bind(url, handler); }
值得注意的是,能夠爲其添加多個ChannelHandler,若是爲多個,則對其封裝爲ChannelHandlerDispatcher,而後傳遞給NettyServer。框架
先引入一個名詞Exchanger,我稱爲信息交換回執器,它實際上是對Transporter的封裝。Exchanger 一樣具備bind()和connect()方法。
Exchanger的bind()和connect()接收一個ExchangeHandler的消息處理句柄,ExchangeHandler 也是一個ChannelHandler,也是用來處理通道消息的,但它對其進行的加強,有一個回執的方法reply()。即接收一個消息後,能夠進行回執,經過reply()。dom
和Transporters同樣,工具類Exchangers一樣的方式經過ExtensionLoader.getExtensionLoader()來獲取特定的Exchanger,默認爲HeaderExchanger。HeaderExchanger#bind()和connect()。
HeaderExchanger是默認的信息交換回執器。能夠看到HeaderExchangeServer 和HeaderExchangeClient分別接收RemoteServer 和Client。而且經過Transporters調用傳入。ExchangeHandler會被包裹成HeaderExchangeHandler,接着在被包裹爲DecodeHandler。
從這裏咱們就能夠解答第五個問題,Transporters#bind()傳入的ChannelHandler 實際上是DecodeHandler。爲啥這裏須要DecodeHandler,實際上是服務端或者客戶端在收到對等端的消息字節數據後,首先解析的是頭部信息,body信息是沒有在netty的編解碼中進行解析的,是到了真正處理消息的時候,經過Decodehandler#received()內部進行解碼的。tcp
實際上,框架真正用到的是以下:
ide
其餘最終都是會調到這個方法,有時間的同窗能夠本身調用下其餘的重載方法。工具
這個問題請查看DubboProtocol的成員變量ExchangeHandler。接下來,咱們來分析這個ExchangeHandler 到底作了什麼。
DubboProtocol內部new 了一個ExchangeHandlerAdapter 對象,也就是ExchangeHandler。該handler主要處理已經Invocation類型的消息。
首先,看下received()方法。
@Override public void received(Channel channel, Object message) throws RemotingException { if (message instanceof Invocation) { reply((ExchangeChannel) channel, message); } else { super.received(channel, message); } }
若是消息的類型是Invocation,那麼調用reply方法進行消息應答,若是不是調用超類,也就是
ExchangeHandlerAdapter的received方法,該方法是空的,因此即會丟棄該消息內容。
那咱們來看下,relpy方法主要乾了什麼?註釋以下代碼塊。
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException { /** * * 若是消息類型不是Invocation,那麼會拋出異常,通常狀況下不會,在received方法上已經進行的消息類型判斷。 */ if (!(message instanceof Invocation)) { throw new RemotingException(channel, "Unsupported request: " + (message == null ? null : (message.getClass().getName() + ": " + message)) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress()); } Invocation inv = (Invocation) message; //這裏獲得的就是服務Invoker根據inv。 Invoker<?> invoker = getInvoker(channel, inv); // need to consider backward-compatibility if it's a callback // 看是否是dubbo 回調,以後看下回調內容 if (Boolean.TRUE.toString().equals(inv.getObjectAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) { String methodsStr = invoker.getUrl().getParameters().get("methods"); boolean hasMethod = false; if (methodsStr == null || !methodsStr.contains(",")) { hasMethod = inv.getMethodName().equals(methodsStr); } else { String[] methods = methodsStr.split(","); for (String method : methods) { if (inv.getMethodName().equals(method)) { hasMethod = true; break; } } } if (!hasMethod) { logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored." + " please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv); return null; } } RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); //調用,這裏就是服務調用,這裏會牽扯到dubbo filter 相關內容,在服務調用時具體堅定 Result result = invoker.invoke(inv); return result.thenApply(Function.identity()); }
從上面的可知道,reply主要就是進行服務的調用,核心語句就是 invoker.invoke(inv)。
那麼是如何找到這個服務提供者的服務呢。來看下getInvoker()。
Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException { boolean isCallBackServiceInvoke = false; boolean isStubServiceInvoke = false; int port = channel.getLocalAddress().getPort(); String path = (String) inv.getObjectAttachments().get(PATH_KEY); // if it's callback service on client side //若是該調用是在客服端,可能會有配置Stub類,經過isStubServiceInvoke標註 isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getObjectAttachments().get(STUB_EVENT_KEY)); if (isStubServiceInvoke) { port = channel.getRemoteAddress().getPort(); } //callback // 查看是不是本地回調 isCallBackServiceInvoke = isClientSide(channel) && !isStubServiceInvoke; if (isCallBackServiceInvoke) { path += "." + inv.getObjectAttachments().get(CALLBACK_SERVICE_KEY); inv.getObjectAttachments().put(IS_CALLBACK_SERVICE_INVOKE, Boolean.TRUE.toString()); } // 構建serviceKey,經過端口port,路徑(通常是接口的全限定名)path,版本號version,分組group String serviceKey = serviceKey( port, path, (String) inv.getObjectAttachments().get(VERSION_KEY), (String) inv.getObjectAttachments().get(GROUP_KEY) ); //全部的服務導出都會放在exporterMap對象裏,而後根據key獲取獲得DubboExporter DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey); if (exporter == null) { throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + getInvocationWithoutData(inv)); } // 接着返回Invoker。 return exporter.getInvoker(); }
當服務調用方與服務提供方創建鏈接和斷開鏈接時,代碼以下:
@Override public void connected(Channel channel) throws RemotingException { invoke(channel, ON_CONNECT_KEY); } @Override public void disconnected(Channel channel) throws RemotingException { if (logger.isDebugEnabled()) { logger.debug("disconnected from " + channel.getRemoteAddress() + ",url:" + channel.getUrl()); } invoke(channel, ON_DISCONNECT_KEY); }
都是進行調用invoke方法。那麼invoke方法主要乾了啥?
private void invoke(Channel channel, String methodKey) { Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey); if (invocation != null) { try { received(channel, invocation); } catch (Throwable t) { logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t); } } } /** * FIXME channel.getUrl() always binds to a fixed service, and this service is random. * we can choose to use a common service to carry onConnect event if there's no easy way to get the specific * service this connection is binding to. * @param channel * @param url * @param methodKey * @return */ private Invocation createInvocation(Channel channel, URL url, String methodKey) { String method = url.getParameter(methodKey); if (method == null || method.length() == 0) { return null; } RpcInvocation invocation = new RpcInvocation(method, url.getParameter(INTERFACE_KEY), new Class<?>[0], new Object[0]); invocation.setAttachment(PATH_KEY, url.getPath()); invocation.setAttachment(GROUP_KEY, url.getParameter(GROUP_KEY)); invocation.setAttachment(INTERFACE_KEY, url.getParameter(INTERFACE_KEY)); invocation.setAttachment(VERSION_KEY, url.getParameter(VERSION_KEY)); if (url.getParameter(STUB_EVENT_KEY, false)) { invocation.setAttachment(STUB_EVENT_KEY, Boolean.TRUE.toString()); } return invocation; }
這一篇文章距離上一次已經好久了,主要是遇到了國慶,本身想放鬆一下,接下來還會繼續努力的分析dubbo的一些內容。