server和client都是如下方法獲得的,Exchanger這個接口只有這麼一個實現,未來可能其餘更加複雜得到server和cliet方式,如下這種是目前惟一的異步
public class HeaderExchanger implements Exchanger { public static final String NAME = "header"; public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true); } public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); } }
connect和 bind獲得的最終的server和client,Transporters.connect調用方法是如下兩個:
public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } ChannelHandler handler; if (handlers == null || handlers.length == 0) { handler = new ChannelHandlerAdapter(); } else if (handlers.length == 1) { handler = handlers[0]; } else { handler = new ChannelHandlerDispatcher(handlers); } return getTransporter().connect(url, handler); } public static Transporter getTransporter() { return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension(); }
經過spi的自適應擴展做爲生產實例的中間工廠,這個工廠根據url參數獲得不一樣的transport,若是url裏面指定netty4,那麼就能夠獲得netty4的client
切換不一樣transport怎麼作到的?那就是經過自適應擴展加url自由切換。
回到最上面的,經過bind已經拿到最終的nettyserver,繼續包裹了一層HeaderExchangeServer,這裏面主要處理心跳、channel、future的封裝,以及屏蔽不一樣類型的server(netty、netty4等等)。
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
繼續看這裏,這個handler很長,對於dubbo來講,最裏面這個handler是經過CreateServer方法中的server = Exchangers.bind(url, requestHandler)這個傳入進來的,
這個requestHandler就是dubboprotocl裏面內部的,這個handler只有reply方法,做用就是執行doinvoe,也就是真正履行provider義務的地方,也只會在request來的時候纔會被調用。
HeaderExchangeHandler用來處理request、response的,decodehandler用來解碼。這handler到這裏是否是已經結束了?顯然不是,netty4server初始化的時候:
public NettyServer(URL url, ChannelHandler handler) throws RemotingException { super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); } protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) { return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class) .getAdaptiveExtension().dispatch(handler, url))); }
日誌裏面常常看到的 「DubboServerHandler」就是這個SERVER_THREAD_POOL_NAME。handler鏈上又有下面這幾個:
MultiMessageHandler:針對multimessage類型消息,在received作攔截。
HeartbeatHandler:針對心跳在received作攔截。
經過dispatch出來的默認的allchannelhandler:對全部io事件作處理,大部分任務都扔到線程池裏面作異步處理,防止阻塞netty線程。這個業務線程池的類型、個數也是url指定。
因此這個handler鏈路上面從外到內:MultiMessageHandler HeartbeatHandler Allchannelhandler DecodeHandler HeadExchangeHandler dubboprotocol裏面自帶的帶有reply方法的handler。
對於netty來講,消息在到達這些handler處理之前,已經被netty的編解碼handler處理了,因此DecodeHandler無關緊要
HeadExchangeHandler看起來意義不大,其實這個是用來阻斷傳遞到dubbo裏面的handler的,它是最後一道防線,用來決定要不要丟給dubbo作reply、doinvoke操做的,最重要的方法:
public void received(Channel channel, Object message) throws RemotingException { channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis()); ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); try { if (message instanceof Request) { // handle request. Request request = (Request) message; if (request.isEvent()) { handlerEvent(channel, request); } else { if (request.isTwoWay()) { Response response = handleRequest(exchangeChannel, request); channel.send(response); } else { handler.received(exchangeChannel, request.getData()); } } } else if (message instanceof Response) { handleResponse(channel, (Response) message); } else if (message instanceof String) { if (isClientSide(channel)) { Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl()); logger.error(e.getMessage(), e); } else { String echo = handler.telnet(channel, (String) message); if (echo != null && echo.length() > 0) { channel.send(echo); } } } else { handler.received(exchangeChannel, message); } } finally { HeaderExchangeChannel.removeChannelIfDisconnected(channel); } }
若是two-way、對方給我request,須要返回response,那麼handleRequest會調用dubboprotocol的reply處理這個request
若是對方給個人response,那麼調用handleResponse(channel, (Response) message);
static void handleResponse(Channel channel, Response response) throws RemotingException { if (response != null && !response.isHeartbeat()) { DefaultFuture.received(channel, response); } }
public static void received(Channel channel, Response response) { try { DefaultFuture future = FUTURES.remove(response.getId()); if (future != null) { future.doReceived(response); } else { logger.warn("The timeout response finally returned at " + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) + ", response " + response + (channel == null ? "" : ", channel: " + channel.getLocalAddress() + " -> " + channel.getRemoteAddress())); } } finally { CHANNELS.remove(response.getId()); } }
收到response之後,在這裏找到對應的future,經過future喚醒以前阻塞在發送request之後的業務線程:ide
private void doReceived(Response res) { lock.lock(); try { response = res; if (done != null) { done.signal(); } } finally { lock.unlock(); } if (callback != null) { invokeCallback(callback); } }