Dubbo分析Serialize層
Dubbo分析之Transport層
Dubbo分析之Exchange 層git
緊接着上文Dubbo分析之Transport層,本文繼續介紹Exchange層,此層官方介紹爲信息交換層:封裝請求響應模式,同步轉異步,以 Request, Response 爲中心,擴展接口爲 Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer;下面分別進行介紹github
Exchanger是此層的核心接口類,提供了connect()和bind()接口,分別返回ExchangeClient和ExchangeServer;dubbo提供了此接口的默認實現類HeaderExchanger,代碼以下:segmentfault
public class HeaderExchanger implements Exchanger { public static final String NAME = "header"; @Override public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true); } @Override public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); } }
在實現類中在connect和bind中分別實例化了HeaderExchangeClient和HeaderExchangeServer,傳入的參數是Transporters,能夠認爲這裏就是Transport層的入口類;這裏的ExchangeClient/ExchangeServer其實就是對Client/Server的包裝,同時傳入了本身的ChannelHandler;ChannelHandler已經在Transport層介紹過了,提供了鏈接創建,鏈接端口,發送請求,接受請求等接口;已默認使用的Netty爲例,這裏就是對NettyClient和NettyServer的包裝,同時傳入DecodeHandler,在NettyHandler中被調用;服務器
ExchangeClient自己也繼承於Client,同時也繼承於ExchangeChannel:異步
public interface ExchangeClient extends Client, ExchangeChannel { } public interface ExchangeChannel extends Channel { ResponseFuture request(Object request) throws RemotingException; ResponseFuture request(Object request, int timeout) throws RemotingException; ExchangeHandler getExchangeHandler(); @Override void close(int timeout); }
ExchangeChannel負責將上層的data包裝成Request,而後發送給Transport層;具體的邏輯在HeaderExchangeChannel中:ide
public ResponseFuture request(Object request, int timeout) throws RemotingException { if (closed) { throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!"); } // create request. Request req = new Request(); req.setVersion(Version.getProtocolVersion()); req.setTwoWay(true); req.setData(request); DefaultFuture future = new DefaultFuture(channel, req, timeout); try { channel.send(req); } catch (RemotingException e) { future.cancel(); throw e; } return future; }
建立了一個Request,在構造器中同時會產生一個RequestId;設置了協議版本,是否雙向通訊,最後設置了真實的業務數據;接下來實例化了一個DefaultFuture類,此類實現了同步轉異步的方式,channel調用send發送請求以後,不須要等待結果,直接將DefaultFuture返回給上層,上層能夠經過調用DefaultFuture的get方法來獲取響應,get方法會阻塞等待獲取服務器的響應纔會返回;Client接收消息在handler裏面,好比Netty在NettyHandler裏面messageReceived方法介紹響應消息,NettyHandler最終會調用上面傳入的DecodeHandler,DecodeHandler會先判斷一下是否已經解碼,若是解碼就直接調用HeaderExchangeHandler,默認已經設置了編碼解碼器,因此會直接調用HeaderExchangeHandler裏面的received方法:ui
public void received(Channel channel, Object message) throws RemotingException { channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis()); ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); try { if (message instanceof Request) { // handle request. Request request = (Request) message; if (request.isEvent()) { handlerEvent(channel, request); } else { if (request.isTwoWay()) { Response response = handleRequest(exchangeChannel, request); channel.send(response); } else { handler.received(exchangeChannel, request.getData()); } } } else if (message instanceof Response) { handleResponse(channel, (Response) message); } else if (message instanceof String) { if (isClientSide(channel)) { Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl()); logger.error(e.getMessage(), e); } else { String echo = handler.telnet(channel, (String) message); if (echo != null && echo.length() > 0) { channel.send(echo); } } } else { handler.received(exchangeChannel, message); } } finally { HeaderExchangeChannel.removeChannelIfDisconnected(channel); } }
服務端和客戶端都會使用此方法,這裏是客戶端接受的是Response,直接調用handleResponse方法:this
static void handleResponse(Channel channel, Response response) throws RemotingException { if (response != null && !response.isHeartbeat()) { DefaultFuture.received(channel, response); } }
接收到響應以後,再去告訴DefaultFuture已經收到響應,DefaultFuture自己存放了requestId對應DefaultFuture的一個ConcurrentHashMap;具體怎麼映射過去,Response也包含一個responseId,此responseId和requestId是相同的;編碼
private final Lock lock = new ReentrantLock(); private final Condition done = lock.newCondition(); public static void received(Channel channel, Response response) { try { DefaultFuture future = FUTURES.remove(response.getId()); if (future != null) { future.doReceived(response); } else { logger.warn("The timeout response finally returned at " + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) + ", response " + response + (channel == null ? "" : ", channel: " + channel.getLocalAddress() + " -> " + channel.getRemoteAddress())); } } finally { CHANNELS.remove(response.getId()); } } private void doReceived(Response res) { lock.lock(); try { response = res; if (done != null) { done.signal(); } } finally { lock.unlock(); } if (callback != null) { invokeCallback(callback); } }
經過responseId獲取了以前請求時建立的DefaultFuture,而後再更新DefaultFuture內部的response對象,更新完以後在調用Condition的signal方法,用戶喚起經過DefaultFuture的get方法獲取響應的阻塞線程:url
public Object get(int timeout) throws RemotingException { if (timeout <= 0) { timeout = Constants.DEFAULT_TIMEOUT; } if (!isDone()) { long start = System.currentTimeMillis(); lock.lock(); try { while (!isDone()) { done.await(timeout, TimeUnit.MILLISECONDS); if (isDone() || System.currentTimeMillis() - start > timeout) { break; } } } catch (InterruptedException e) { throw new RuntimeException(e); } finally { lock.unlock(); } if (!isDone()) { throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false)); } } return returnFromResponse(); }
能夠發現阻塞要麼被獲取被signal方法喚醒,要麼等待超時;以上大體是客戶端發送獲取響應的流程,下面看看服務器端流程
ExchangeServer繼承於Server,同時提供了兩個包裝服務端Channel的方法
public interface ExchangeServer extends Server { Collection<ExchangeChannel> getExchangeChannels(); ExchangeChannel getExchangeChannel(InetSocketAddress remoteAddress); }
服務器端主要用於接收Request消息,而後處理消息,最後把響應發送給客戶端,相關接收消息已經在上面介紹過了,一樣是在HeaderExchangeHandler裏面的received方法中,只不過這裏的消息類型爲Request;
Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException { Response res = new Response(req.getId(), req.getVersion()); if (req.isBroken()) { Object data = req.getData(); String msg; if (data == null) msg = null; else if (data instanceof Throwable) msg = StringUtils.toString((Throwable) data); else msg = data.toString(); res.setErrorMessage("Fail to decode request due to: " + msg); res.setStatus(Response.BAD_REQUEST); return res; } // find handler by message class. Object msg = req.getData(); try { // handle data. Object result = handler.reply(channel, msg); res.setStatus(Response.OK); res.setResult(result); } catch (Throwable e) { res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(e)); } return res; }
首先建立了一個Response,而且指定responseId爲requestId,方便在客戶端定位到具體的DefaultFuture;而後調用handler的reply方法處理消息,返回結果,如何處理的將在後面的protocol層介紹,大體就是經過Request的信息,反射調用Server端的服務,而後返回結果,而後將結果放入Response對象中,經過channel將消息發送客戶端;
本文介紹了Exchange層的大致流程,圍繞Exchanger,ExchangeClient和ExchangeServer展開;請求封裝成Request,響應封裝成Response,客戶端經過異步的方式接收服務器請求;
https://github.com/ksfzhaohui...
https://gitee.com/OutOfMemory...