目標:介紹Exchange層的相關設計和邏輯、介紹dubbo-remoting-api中的exchange包內的源碼解析。
上一篇文章我講的是dubbo框架設計中Transport層,這篇文章我要講的是它的上一層Exchange層,也就是信息交換層。官方文檔對這一層的解釋是封裝請求響應模式,同步轉異步,以 Request, Response爲中心,擴展接口爲 Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer。java
這一層的設計意圖是什麼?它應該算是在信息傳輸層上又作了部分裝飾,爲了適應rpc調用的一些需求,好比rpc調用中一次請求只關心它所對應的響應,這個時候只是一個message消息傳輸過來,是沒法區分這是新的請求仍是上一個請求的響應,這種相似於冪等性的問題以及rpc異步處理返回結果、內置事件等特性都是在Transport層沒法解決知足的,全部在Exchange層講message分紅了request和response兩種類型,而且在這兩個模型上增長一些系統字段來處理問題。具體我會在下面講到。而dubbo把一條消息分爲了協議頭和內容兩部分:協議頭包括系統字段,例如編號等,內容包括具體請求的參數和響應的結果等。在exchange層中大量邏輯都是基於協議頭的。git
如今對這一層的設計意圖大體應該有所瞭解了吧,如今來看看exchange的類圖:github
我講解的順序仍是按照類圖從上而下,分塊講解,忽略綠色的test類。segmentfault
public interface ExchangeChannel extends Channel { ResponseFuture request(Object request) throws RemotingException; ResponseFuture request(Object request, int timeout) throws RemotingException; ExchangeHandler getExchangeHandler(); @Override void close(int timeout); }
該接口是信息交換通道接口,有四個方法,前兩個是發送請求消息,區別就是第二個發送請求有超時的參數,getExchangeHandler方法就是返回一個信息交換處理器,第四個是須要覆寫父類的方法。api
該類實現了ExchangeChannel,是基於協議頭的信息交換通道。數組
private static final Logger logger = LoggerFactory.getLogger(HeaderExchangeChannel.class); /** * 通道的key值 */ private static final String CHANNEL_KEY = HeaderExchangeChannel.class.getName() + ".CHANNEL"; /** * 通道 */ private final Channel channel; /** * 是否關閉 */ private volatile boolean closed = false;
上述屬性比較簡單,仍是放一下這個類的屬性是由於該類中有channel屬性,也就是說HeaderExchangeChannel是Channel的裝飾器,每一個實現方法都會調用channel的方法。緩存
static HeaderExchangeChannel getOrAddChannel(Channel ch) { if (ch == null) { return null; } // 得到通道中的HeaderExchangeChannel HeaderExchangeChannel ret = (HeaderExchangeChannel) ch.getAttribute(CHANNEL_KEY); if (ret == null) { // 建立一個HeaderExchangeChannel實例 ret = new HeaderExchangeChannel(ch); // 若是通道鏈接 if (ch.isConnected()) { // 加入屬性值 ch.setAttribute(CHANNEL_KEY, ret); } } return ret; } static void removeChannelIfDisconnected(Channel ch) { // 若是通道斷開鏈接 if (ch != null && !ch.isConnected()) { // 移除屬性值 ch.removeAttribute(CHANNEL_KEY); } }
該靜態方法作了HeaderExchangeChannel的建立和銷燬,而且生命週期隨channel銷燬而銷燬。服務器
@Override public void send(Object message) throws RemotingException { send(message, getUrl().getParameter(Constants.SENT_KEY, false)); } @Override public void send(Object message, boolean sent) throws RemotingException { // 若是通道關閉,拋出異常 if (closed) { throw new RemotingException(this.getLocalAddress(), null, "Failed to send message " + message + ", cause: The channel " + this + " is closed!"); } // 判斷消息的類型 if (message instanceof Request || message instanceof Response || message instanceof String) { // 發送消息 channel.send(message, sent); } else { // 新建一個request實例 Request request = new Request(); // 設置信息的版本 request.setVersion(Version.getProtocolVersion()); // 該請求不須要響應 request.setTwoWay(false); // 把消息傳入 request.setData(message); // 發送消息 channel.send(request, sent); } }
該方法是在channel的send方法上加上了request和response模型,最後再調用channel.send,起到了裝飾器的做用。微信
@Override public ResponseFuture request(Object request) throws RemotingException { return request(request, channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT)); } @Override public ResponseFuture request(Object request, int timeout) throws RemotingException { // 若是通道關閉,則拋出異常 if (closed) { throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!"); } // create request.建立請求 Request req = new Request(); // 設置版本號 req.setVersion(Version.getProtocolVersion()); // 設置須要響應 req.setTwoWay(true); // 把請求數據傳入 req.setData(request); // 建立DefaultFuture對象,能夠從future中主動得到請求對應的響應信息 DefaultFuture future = new DefaultFuture(channel, req, timeout); try { // 發送請求消息 channel.send(req); } catch (RemotingException e) { future.cancel(); throw e; } return future; }
該方法是請求方法,用Request模型把請求內容裝飾起來,而後發送一個Request類型的消息,而且返回DefaultFuture實例,DefaultFuture我會在後面講到。多線程
cloes方法也重寫了,我就再也不多說,由於比較簡單,沒有重點,其餘方法都是直接調用channel屬性的方法。
該接口繼承了Client和ExchangeChannel,是信息交換客戶端接口,其中沒有定義多餘的方法。
該類實現了ExchangeClient接口,是基於協議頭的信息交互客戶端類,一樣它是Client、Channel的適配器。在該類的源碼中能夠看到全部的實現方法都是調用了client和channel屬性的方法。該類主要的做用就是增長了心跳功能,爲何要增長心跳功能呢,對於長鏈接,一些拔網線等物理層的斷開,會致使TCP的FIN消息來不及發送,對方收不到斷開事件,那麼就須要用到發送心跳包來檢測鏈接是否斷開。consumer和provider斷開,處理措施不同,會分別作出重連和關閉通道的操做。
private static final Logger logger = LoggerFactory.getLogger(HeaderExchangeClient.class); /** * 定時器線程池 */ private static final ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("dubbo-remoting-client-heartbeat", true)); /** * 客戶端 */ private final Client client; /** * 信息交換通道 */ private final ExchangeChannel channel; // heartbeat timer /** * 心跳定時器 */ private ScheduledFuture<?> heartbeatTimer; // heartbeat(ms), default value is 0 , won't execute a heartbeat. /** * 心跳週期,間隔多久發送心跳消息檢測一次 */ private int heartbeat; /** * 心跳超時時間 */ private int heartbeatTimeout;
該類的屬性除了須要適配的屬性外,其餘都是跟心跳相關屬性。
public HeaderExchangeClient(Client client, boolean needHeartbeat) { if (client == null) { throw new IllegalArgumentException("client == null"); } this.client = client; // 建立信息交換通道 this.channel = new HeaderExchangeChannel(client); // 得到dubbo版本 String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY); //得到心跳週期配置,若是沒有配置,而且dubbo是1.0版本的,則這隻爲1分鐘,不然設置爲0 this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0); // 得到心跳超時配置,默認是心跳週期的三倍 this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3); // 若是心跳超時時間小於心跳週期的兩倍,則拋出異常 if (heartbeatTimeout < heartbeat * 2) { throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2"); } if (needHeartbeat) { // 開啓心跳 startHeartbeatTimer(); } }
構造函數就是對一些屬性初始化設置,優先從url中獲取。心跳超時時間小於心跳週期的兩倍就拋出異常,意思就是至少重試兩次心跳檢測。
private void startHeartbeatTimer() { // 中止現有的心跳線程 stopHeartbeatTimer(); // 若是須要心跳 if (heartbeat > 0) { // 建立心跳定時器 heartbeatTimer = scheduled.scheduleWithFixedDelay( // 新建一個心跳線程 new HeartBeatTask(new HeartBeatTask.ChannelProvider() { @Override public Collection<Channel> getChannels() { // 返回一個只包含HeaderExchangeClient對象的不可變列表 return Collections.<Channel>singletonList(HeaderExchangeClient.this); } }, heartbeat, heartbeatTimeout), heartbeat, heartbeat, TimeUnit.MILLISECONDS); } }
該方法就是開啓心跳。利用心跳定時器來作到定時檢測心跳。由於這是信息交換客戶端類,全部這裏的只是返回包含HeaderExchangeClient對象的不可變列表,由於客戶端跟channel是一一對應的,只有這一個該客戶端自己的channel須要心跳。
private void stopHeartbeatTimer() { if (heartbeatTimer != null && !heartbeatTimer.isCancelled()) { try { // 取消定時器 heartbeatTimer.cancel(true); // 取消大量已排隊任務,用於回收空間 scheduled.purge(); } catch (Throwable e) { if (logger.isWarnEnabled()) { logger.warn(e.getMessage(), e); } } } heartbeatTimer = null; }
該方法是中止現有心跳,也就是中止定時器,釋放空間。
其餘方法都是調用channel和client屬性的方法。
該類實現了Runnable接口,實現的是心跳任務,裏面包含了核心的心跳策略。
/** * 通道管理 */ private ChannelProvider channelProvider; /** * 心跳間隔 單位:ms */ private int heartbeat; /** * 心跳超時時間 單位:ms */ private int heartbeatTimeout;
後兩個屬性跟HeaderExchangeClient中的屬性含義同樣,第一個是該類本身內部的一個接口:
interface ChannelProvider { // 得到全部的通道集合,須要心跳的通道數組 Collection<Channel> getChannels(); }
該接口就定義了一個方法,得到須要心跳的通道集合。可想而知,會對集合內的通道都作心跳檢測。
@Override public void run() { try { long now = System.currentTimeMillis(); // 遍歷全部通道 for (Channel channel : channelProvider.getChannels()) { // 若是通道關閉了,則跳過 if (channel.isClosed()) { continue; } try { // 最後一次接收到消息的時間戳 Long lastRead = (Long) channel.getAttribute( HeaderExchangeHandler.KEY_READ_TIMESTAMP); // 最後一次發送消息的時間戳 Long lastWrite = (Long) channel.getAttribute( HeaderExchangeHandler.KEY_WRITE_TIMESTAMP); // 若是最後一次接收或者發送消息到時間到如今的時間間隔超過了心跳間隔時間 if ((lastRead != null && now - lastRead > heartbeat) || (lastWrite != null && now - lastWrite > heartbeat)) { // 建立一個request Request req = new Request(); // 設置版本號 req.setVersion(Version.getProtocolVersion()); // 設置須要獲得響應 req.setTwoWay(true); // 設置事件類型,爲心跳事件 req.setEvent(Request.HEARTBEAT_EVENT); // 發送心跳請求 channel.send(req); if (logger.isDebugEnabled()) { logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress() + ", cause: The channel has no data-transmission exceeds a heartbeat period: " + heartbeat + "ms"); } } // 若是最後一次接收消息的時間到如今已經超過了超時時間 if (lastRead != null && now - lastRead > heartbeatTimeout) { logger.warn("Close channel " + channel + ", because heartbeat read idle time out: " + heartbeatTimeout + "ms"); // 若是該通道是客戶端,也就是請求的服務器掛掉了,客戶端嘗試重連服務器 if (channel instanceof Client) { try { // 從新鏈接服務器 ((Client) channel).reconnect(); } catch (Exception e) { //do nothing } } else { // 若是不是客戶端,也就是是服務端返回響應給客戶端,可是客戶端掛掉了,則服務端關閉客戶端鏈接 channel.close(); } } } catch (Throwable t) { logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t); } } } catch (Throwable t) { logger.warn("Unhandled exception when heartbeat, cause: " + t.getMessage(), t); } }
該方法中是心跳機制的核心邏輯。注意如下幾個點:
public interface ResponseFuture { Object get() throws RemotingException; Object get(int timeoutInMillis) throws RemotingException; void setCallback(ResponseCallback callback); boolean isDone(); }
該接口是響應future接口,該接口的設計意圖跟java.util.concurrent.Future很相似。發送出去的消息,潑出去的水,只有等到對方主動響應才能獲得結果,可是請求方須要去主動回去該請求的結果,就顯得有些艱難,全部產生了這樣一個接口,它可以獲取任務執行結果、能夠覈對請求消息是否被響應,還能設置回調來支持異步。
該類實現了ResponseFuture接口,其中封裝了處理響應的邏輯。你能夠把DefaultFuture當作是一箇中介,買房和賣房都經過這個中介進行溝通,中介擁有着買房者的信息request和賣房者的信息response,而且促成他們之間的買賣。
private static final Logger logger = LoggerFactory.getLogger(DefaultFuture.class); /** * 通道集合 */ private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<Long, Channel>(); /** * Future集合,key爲請求編號 */ private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>(); // invoke id. /** * 請求編號 */ private final long id; /** * 通道 */ private final Channel channel; /** * 請求 */ private final Request request; /** * 超時 */ private final int timeout; /** * 鎖 */ private final Lock lock = new ReentrantLock(); /** * 完成狀況,控制多線程的休眠與喚醒 */ private final Condition done = lock.newCondition(); /** * 建立開始時間 */ private final long start = System.currentTimeMillis(); /** * 發送請求時間 */ private volatile long sent; /** * 響應 */ private volatile Response response; /** * 回調 */ private volatile ResponseCallback callback;
能夠看到,該類的屬性包含了request、response、channel三個實例,在該類中,把請求和響應經過惟一的id一一對應起來。作到異步處理返回結果時能給準確的返回給對應的請求。能夠看到屬性中有兩個集合,分別是通道集合和future集合,也就是該類自己也是全部 DefaultFuture 的管理容器。
public DefaultFuture(Channel channel, Request request, int timeout) { this.channel = channel; this.request = request; // 設置請求編號 this.id = request.getId(); this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); // put into waiting map.,加入到等待集合中 FUTURES.put(id, this); CHANNELS.put(id, channel); }
構造函數比較簡單,每個DefaultFuture實例都跟每個請求一一對應,被存入到集合中管理起來。
public static void closeChannel(Channel channel) { // 遍歷通道集合 for (long id : CHANNELS.keySet()) { if (channel.equals(CHANNELS.get(id))) { // 經過請求id得到future DefaultFuture future = getFuture(id); if (future != null && !future.isDone()) { // 建立一個關閉通道的響應 Response disconnectResponse = new Response(future.getId()); disconnectResponse.setStatus(Response.CHANNEL_INACTIVE); disconnectResponse.setErrorMessage("Channel " + channel + " is inactive. Directly return the unFinished request : " + future.getRequest()); // 接收該關閉通道而且請求未完成的響應 DefaultFuture.received(channel, disconnectResponse); } } } }
該方法是關閉不活躍的通道,而且返回請求未完成。也就是關閉指定channel的請求,返回的是請求未完成。
public static void received(Channel channel, Response response) { try { // future集合中移除該請求的future,(響應id和請求id一一對應的) DefaultFuture future = FUTURES.remove(response.getId()); if (future != null) { // 接收響應結果 future.doReceived(response); } else { logger.warn("The timeout response finally returned at " + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) + ", response " + response + (channel == null ? "" : ", channel: " + channel.getLocalAddress() + " -> " + channel.getRemoteAddress())); } } finally { // 通道集合移除該請求對應的通道,表明着這一次請求結束 CHANNELS.remove(response.getId()); } }
該方法是接收響應,也就是某個請求獲得了響應,那麼表明此次請求任務完成,全部須要把future從集合中移除。具體的接收響應結果在doReceived方法中實現。
private void doReceived(Response res) { // 得到鎖 lock.lock(); try { // 設置響應 response = res; if (done != null) { // 喚醒等待 done.signal(); } } finally { // 釋放鎖 lock.unlock(); } if (callback != null) { // 執行回調 invokeCallback(callback); } }
能夠看到,當接收到響應後,會把等待的線程喚醒,而後執行回調來處理該響應結果。
private void invokeCallback(ResponseCallback c) { ResponseCallback callbackCopy = c; if (callbackCopy == null) { throw new NullPointerException("callback cannot be null."); } c = null; Response res = response; if (res == null) { throw new IllegalStateException("response cannot be null. url:" + channel.getUrl()); } // 若是響應成功,返回碼是20 if (res.getStatus() == Response.OK) { try { // 使用響應結果執行 完成 後的邏輯 callbackCopy.done(res.getResult()); } catch (Exception e) { logger.error("callback invoke error .reasult:" + res.getResult() + ",url:" + channel.getUrl(), e); } //超時,回調處理成超時異常 } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) { try { TimeoutException te = new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()); // 回調處理異常 callbackCopy.caught(te); } catch (Exception e) { logger.error("callback invoke error ,url:" + channel.getUrl(), e); } // 其餘狀況處理成RemotingException異常 } else { try { RuntimeException re = new RuntimeException(res.getErrorMessage()); callbackCopy.caught(re); } catch (Exception e) { logger.error("callback invoke error ,url:" + channel.getUrl(), e); } } }
該方法是執行回調來處理響應結果。分爲了三種狀況:
具體的處理都在ResponseCallback接口的實現類裏執行,後面我會講到。
@Override public Object get() throws RemotingException { return get(timeout); } @Override public Object get(int timeout) throws RemotingException { // 超時時間默認爲1s if (timeout <= 0) { timeout = Constants.DEFAULT_TIMEOUT; } // 若是請求沒有完成,也就是尚未響應返回 if (!isDone()) { long start = System.currentTimeMillis(); // 得到鎖 lock.lock(); try { // 輪詢 等待請求是否完成 while (!isDone()) { // 線程阻塞等待 done.await(timeout, TimeUnit.MILLISECONDS); // 若是請求完成或者超時,則結束 if (isDone() || System.currentTimeMillis() - start > timeout) { break; } } } catch (InterruptedException e) { throw new RuntimeException(e); } finally { // 釋放鎖 lock.unlock(); } // 若是沒有收到響應,則拋出超時的異常 if (!isDone()) { throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false)); } } // 返回響應 return returnFromResponse(); }
該方法是實現了ResponseFuture定義的方法,是得到該future對應的請求對應的響應結果,其實future、請求、響應都是一一對應的。其中若是還沒獲得響應,則會線程阻塞等待,等到有響應結果或者超時,才返回。返回的邏輯在returnFromResponse中實現。
private Object returnFromResponse() throws RemotingException { Response res = response; if (res == null) { throw new IllegalStateException("response cannot be null"); } // 若是正常返回,則返回響應結果 if (res.getStatus() == Response.OK) { return res.getResult(); } // 若是超時,則拋出超時異常 if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) { throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()); } // 其餘 拋出RemotingException異常 throw new RemotingException(channel, res.getErrorMessage()); }
這代碼跟invokeCallback方法中差很少,都是把響應分了三種狀況。
public void cancel() { // 建立一個取消請求的響應 Response errorResult = new Response(id); errorResult.setErrorMessage("request future has been canceled."); response = errorResult; // 從集合中刪除該請求 FUTURES.remove(id); CHANNELS.remove(id); }
該方法是取消一個請求,能夠直接關閉一個請求,也就是值建立一個響應來回應該請求,把response值設置到該請求對於到future中,作到了中斷請求的做用。該方法跟closeChannel的區別是closeChannel中對response的狀態設置了CHANNEL_INACTIVE,而cancel方法是中途被主動取消的,雖然有response值,可是並無一個響應狀態。
private static class RemotingInvocationTimeoutScan implements Runnable { @Override public void run() { while (true) { try { for (DefaultFuture future : FUTURES.values()) { // 已經完成,跳過掃描 if (future == null || future.isDone()) { continue; } // 超時 if (System.currentTimeMillis() - future.getStartTimestamp() > future.getTimeout()) { // create exception response.,建立一個超時的響應 Response timeoutResponse = new Response(future.getId()); // set timeout status.,設置超時狀態,是服務端側超時仍是客戶端側超時 timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT); // 設置錯誤信息 timeoutResponse.setErrorMessage(future.getTimeoutMessage(true)); // handle response.,接收建立的超時響應 DefaultFuture.received(future.getChannel(), timeoutResponse); } } // 睡眠 Thread.sleep(30); } catch (Throwable e) { logger.error("Exception when scan the timeout invocation of remoting.", e); } } } }
該方法是掃描調用超時任務的線程,每次都會遍歷future集合,檢測請求是否超時了,若是超時則建立一個超時響應來回應該請求。
static { // 開啓一個後臺掃描調用超時任務 Thread th = new Thread(new RemotingInvocationTimeoutScan(), "DubboResponseTimeoutScanTimer"); th.setDaemon(true); th.start(); }
開啓一個後臺線程進行掃描的邏輯寫在了靜態代碼塊裏面,只開啓一次。
該類實現了ResponseFuture,目前沒有用到,很簡單的實現,我就很少說了。
該接口繼承了ChannelHandler, TelnetHandler接口,是信息交換處理器接口。
public interface ExchangeHandler extends ChannelHandler, TelnetHandler { /** * reply. * 回覆請求結果 * @param channel * @param request * @return response * @throws RemotingException */ Object reply(ExchangeChannel channel, Object request) throws RemotingException; }
該接口只定義了一個回覆請求結果的方法,返回的是請求結果。
該類實現了ExchangeHandler接口, 是信息交換處理器調度器類,也就是對應不一樣的事件,選擇不一樣的處理器去處理。該類中有三個屬性,分別對應了三種事件:
/** * 回覆者調度器 */ private final ReplierDispatcher replierDispatcher; /** * 通道處理器調度器 */ private final ChannelHandlerDispatcher handlerDispatcher; /** * Telnet 命令處理器 */ private final TelnetHandler telnetHandler;
若是事件是跟通道處理器有關的,就調用通道處理器來處理,好比:
@Override @SuppressWarnings({"unchecked", "rawtypes"}) public Object reply(ExchangeChannel channel, Object request) throws RemotingException { return ((Replier) replierDispatcher).reply(channel, request); } @Override public void connected(Channel channel) { handlerDispatcher.connected(channel); } @Override public String telnet(Channel channel, String message) throws RemotingException { return telnetHandler.telnet(channel, message); }
能夠看到以上三種事件,回覆請求結果須要回覆者調度器來處理,鏈接須要通道處理器調度器來處理,telnet消息須要Telnet命令處理器來處理。
該類繼承了TelnetHandlerAdapter,實現了ExchangeHandler,是信息交換處理器的適配器類。
public abstract class ExchangeHandlerAdapter extends TelnetHandlerAdapter implements ExchangeHandler { @Override public Object reply(ExchangeChannel channel, Object msg) throws RemotingException { // 直接返回null return null; } }
該類直接讓ExchangeHandler定義的方法reply返回null,交由它的子類選擇性的去實現具體的回覆請求結果。
該接口繼承了Server接口,定義了兩個方法:
public interface ExchangeServer extends Server { /** * get channels. * 得到通道集合 * @return channels */ Collection<ExchangeChannel> getExchangeChannels(); /** * get channel. * 根據遠程地址得到對應的信息通道 * @param remoteAddress * @return channel */ ExchangeChannel getExchangeChannel(InetSocketAddress remoteAddress); }
該接口比較好理解,而且在Server接口基礎上新定義了兩個方法。直接來看看它的實現類吧。
該類實現了ExchangeServer接口,是基於協議頭的信息交換服務器實現類,HeaderExchangeServer是Server的裝飾器,每一個實現方法都會調用server的方法。
protected final Logger logger = LoggerFactory.getLogger(getClass()); /** * 線程池 */ private final ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1, new NamedThreadFactory( "dubbo-remoting-server-heartbeat", true)); /** * 服務器 */ private final Server server; // heartbeat timer /** * 心跳定時器 */ private ScheduledFuture<?> heartbeatTimer; // heartbeat timeout (ms), default value is 0 , won't execute a heartbeat. /** * 心跳週期 */ private int heartbeat; /** * 心跳超時時間 */ private int heartbeatTimeout; /** * 信息交換服務器是否關閉 */ private AtomicBoolean closed = new AtomicBoolean(false);
該類裏面的不少實現跟HeaderExchangeClient差很少,包括心跳檢測等邏輯。看得懂上述我講的HeaderExchangeClient的屬性,想必這裏的屬性應該也很簡單了。
public HeaderExchangeServer(Server server) { if (server == null) { throw new IllegalArgumentException("server == null"); } this.server = server; //得到心跳週期配置,若是沒有配置,默認設置爲0 this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0); // 得到心跳超時配置,默認是心跳週期的三倍 this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3); // 若是心跳超時時間小於心跳週期的兩倍,則拋出異常 if (heartbeatTimeout < heartbeat * 2) { throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2"); } // 開始心跳 startHeartbeatTimer(); } public Server getServer() { return server; }
構造函數就是對屬性的設置,心跳的機制以及默認值都跟HeaderExchangeClient中的如出一轍。
private boolean isRunning() { Collection<Channel> channels = getChannels(); // 遍歷全部鏈接該服務器的通道 for (Channel channel : channels) { /** * If there are any client connections, * our server should be running. */ // 只要有任何一個客戶端鏈接,則服務器還運行着 if (channel.isConnected()) { return true; } } return false; }
該方法是檢測服務器是否還運行,只要有一個客戶端鏈接着,就算服務器運行着。
@Override public void close() { // 關閉線程池和心跳檢測 doClose(); // 關閉服務器 server.close(); } @Override public void close(final int timeout) { // 開始關閉 startClose(); if (timeout > 0) { final long max = (long) timeout; final long start = System.currentTimeMillis(); if (getUrl().getParameter(Constants.CHANNEL_SEND_READONLYEVENT_KEY, true)) { // 發送 READONLY_EVENT事件給全部鏈接該服務器的客戶端,表示 Server 不可讀了。 sendChannelReadOnlyEvent(); } // 當服務器還在運行,而且沒有超時,睡眠,也就是等待timeout左右時間在進行關閉 while (HeaderExchangeServer.this.isRunning() && System.currentTimeMillis() - start < max) { try { Thread.sleep(10); } catch (InterruptedException e) { logger.warn(e.getMessage(), e); } } } // 關閉線程池和心跳檢測 doClose(); // 延遲關閉 server.close(timeout); }
兩個close方法,第二個close方法是優雅的關閉,有必定的延時來讓一些響應或者操做作完。關閉分兩個步驟,第一個就是關閉信息交換服務器中的線程池和心跳檢測,而後纔是關閉服務器。
private void sendChannelReadOnlyEvent() { // 建立一個READONLY_EVENT事件的請求 Request request = new Request(); request.setEvent(Request.READONLY_EVENT); // 不須要響應 request.setTwoWay(false); // 設置版本 request.setVersion(Version.getProtocolVersion()); Collection<Channel> channels = getChannels(); // 遍歷鏈接的通道,進行通知 for (Channel channel : channels) { try { // 經過通道還鏈接着,則發送通知 if (channel.isConnected()) channel.send(request, getUrl().getParameter(Constants.CHANNEL_READONLYEVENT_SENT_KEY, true)); } catch (RemotingException e) { logger.warn("send cannot write message error.", e); } } }
在關閉服務器中有一個操做就是發送事件READONLY_EVENT,告訴客戶端該服務器不可讀了,就是該方法實現的,逐個通知鏈接的客戶端該事件。
private void doClose() { if (!closed.compareAndSet(false, true)) { return; } // 中止心跳檢測 stopHeartbeatTimer(); try { // 關閉線程池 scheduled.shutdown(); } catch (Throwable t) { logger.warn(t.getMessage(), t); } }
該方法就是close方法調用到的中止心跳檢測和關閉線程池。
@Override public Collection<ExchangeChannel> getExchangeChannels() { Collection<ExchangeChannel> exchangeChannels = new ArrayList<ExchangeChannel>(); // 得到鏈接該服務器通道集合 Collection<Channel> channels = server.getChannels(); if (channels != null && !channels.isEmpty()) { // 遍歷通道集合,爲每一個通道都建立信息交換通道,而且加入信息交換通道集合 for (Channel channel : channels) { exchangeChannels.add(HeaderExchangeChannel.getOrAddChannel(channel)); } } return exchangeChannels; }
該方法是返回鏈接該服務器信息交換通道集合。邏輯就是先得到通道集合,在根據通道來建立信息交換通道,而後返回信息通道集合。
@Override public void reset(URL url) { // 重置屬性 server.reset(url); try { // 重置的邏輯跟構造函數同樣設置 if (url.hasParameter(Constants.HEARTBEAT_KEY) || url.hasParameter(Constants.HEARTBEAT_TIMEOUT_KEY)) { int h = url.getParameter(Constants.HEARTBEAT_KEY, heartbeat); int t = url.getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, h * 3); if (t < h * 2) { throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2"); } if (h != heartbeat || t != heartbeatTimeout) { heartbeat = h; heartbeatTimeout = t; // 從新開始心跳 startHeartbeatTimer(); } } } catch (Throwable t) { logger.error(t.getMessage(), t); } }
該方法就是重置屬性,重置後,從新開始心跳,設置心跳屬性的機制跟構造函數同樣。
private void startHeartbeatTimer() { // 先中止現有的心跳檢測 stopHeartbeatTimer(); if (heartbeat > 0) { // 建立心跳定時器 heartbeatTimer = scheduled.scheduleWithFixedDelay( new HeartBeatTask(new HeartBeatTask.ChannelProvider() { @Override public Collection<Channel> getChannels() { // 返回一個不可修改的鏈接該服務器的信息交換通道集合 return Collections.unmodifiableCollection( HeaderExchangeServer.this.getChannels()); } }, heartbeat, heartbeatTimeout), heartbeat, heartbeat, TimeUnit.MILLISECONDS); } }
該方法是開始心跳,跟HeaderExchangeClient類中的開始心跳方法惟一區別是得到的通道不同,客戶端跟通道是一一對應的,全部只要對一個通道進行心跳檢測,而服務端跟通道是一對多的關係,全部須要對該服務器鏈接的全部通道進行心跳檢測。
private void stopHeartbeatTimer() { if (heartbeatTimer != null && !heartbeatTimer.isCancelled()) { try { // 取消定時器 heartbeatTimer.cancel(true); // 取消大量已排隊任務,用於回收空間 scheduled.purge(); } catch (Throwable e) { if (logger.isWarnEnabled()) { logger.warn(e.getMessage(), e); } } } heartbeatTimer = null; }
該方法是中止當前的心跳檢測。
該類實現了ExchangeServer接口,是信息交換服務器裝飾者,是ExchangeServer的裝飾器。該類就一個屬性ExchangeServer server,全部實現方法都調用了server屬性的方法。目前只有在p2p中被用到,代碼爲就不貼了,很簡單。
@SPI(HeaderExchanger.NAME) public interface Exchanger { /** * bind. * 綁定一個服務器 * @param url 服務器url * @param handler 數據交換處理器 * @return message server 數據交換服務器 */ @Adaptive({Constants.EXCHANGER_KEY}) ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException; /** * connect. * 鏈接一個服務器,也就是建立一個客戶端 * @param url 服務器url * @param handler 數據交換處理器 * @return message channel 返回數據交換客戶端 */ @Adaptive({Constants.EXCHANGER_KEY}) ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException; }
該接口是數據交換者接口,該接口是一個可擴展接口默認實現的是HeaderExchanger類,而且用到了dubbo SPI的Adaptive機制,優先實現url攜帶的配置。若是不瞭解dubbo SPI機制的能夠看《dubbo源碼解析(二)Dubbo擴展機制SPI》。那麼回到該接口定義的方法,定義了綁定和鏈接兩個方法,分別返回信息交互服務器和客戶端實例。
public class HeaderExchanger implements Exchanger { public static final String NAME = "header"; @Override public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { // 用傳輸層鏈接返回的client 建立對應的信息交換客戶端,默認開啓心跳檢測 return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true); } @Override public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { // 用傳輸層綁定返回的server 建立對應的信息交換服務端 return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); } }
該類繼承了Exchanger接口,是Exchanger接口的默認實現,實現了Exchanger接口定義的兩個方法,分別調用的是Transporters的鏈接和綁定方法,再利用這這兩個方法返回的客戶端和服務端實例來建立信息交換的客戶端和服務端。
咱們知道Request對應的是ExchangeHandler接口實現對象來處理,但有些時候咱們須要不一樣數據類型對應不一樣的處理器,該類就是爲了支持這一需求所設計的。
public interface Replier<T> { /** * reply. * 回覆請求結果 * @param channel * @param request * @return response * @throws RemotingException */ Object reply(ExchangeChannel channel, T request) throws RemotingException; }
能夠看到該接口跟ExchangeHandler定義的方法也一一,只有請求的類型改成了範型。
該類實現了Replier接口,是回覆者調度器實現類。
/** * 默認回覆者 */ private final Replier<?> defaultReplier; /** * 回覆者集合 */ private final Map<Class<?>, Replier<?>> repliers = new ConcurrentHashMap<Class<?>, Replier<?>>();
這是該類的兩個屬性,緩存了回覆者集合和默認的回覆者。
/** * 從回覆者集合中找到該類型的回覆者,而且返回 * @param type * @return */ private Replier<?> getReplier(Class<?> type) { for (Map.Entry<Class<?>, Replier<?>> entry : repliers.entrySet()) { if (entry.getKey().isAssignableFrom(type)) { return entry.getValue(); } } if (defaultReplier != null) { return defaultReplier; } throw new IllegalStateException("Replier not found, Unsupported message object: " + type); } /** * 回覆請求 * @param channel * @param request * @return * @throws RemotingException */ @Override @SuppressWarnings({"unchecked", "rawtypes"}) public Object reply(ExchangeChannel channel, Object request) throws RemotingException { return ((Replier) getReplier(request.getClass())).reply(channel, request); }
上述是該類中關鍵的兩個方法,reply仍是調用實現類的reply。根據請求的數據類型來使用指定的回覆者進行回覆。
該類實現了實現 Iterable 接口,是多消息的封裝,咱們直接看它的屬性:
/** * 消息集合 */ private final List messages = new ArrayList();
該類要和《dubbo源碼解析(九)遠程通訊——Transport層》的(八)MultiMessageHandler聯合着看。
該類繼承了AbstractChannelHandlerDelegate類,是心跳處理器。是用來處理心跳事件的,也接收消息上增長了對心跳消息的處理。該類是
@Override public void received(Channel channel, Object message) throws RemotingException { // 設置接收時間的時間戳屬性值 setReadTimestamp(channel); // 若是是心跳請求 if (isHeartbeatRequest(message)) { Request req = (Request) message; // 若是須要響應 if (req.isTwoWay()) { // 建立一個響應 Response res = new Response(req.getId(), req.getVersion()); // 設置爲心跳事件的響應 res.setEvent(Response.HEARTBEAT_EVENT); // 發送消息,也就是返回響應 channel.send(res); if (logger.isInfoEnabled()) { int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0); if (logger.isDebugEnabled()) { logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress() + ", cause: The channel has no data-transmission exceeds a heartbeat period" + (heartbeat > 0 ? ": " + heartbeat + "ms" : "")); } } } return; } // 若是是心跳響應,則直接return if (isHeartbeatResponse(message)) { if (logger.isDebugEnabled()) { logger.debug("Receive heartbeat response in thread " + Thread.currentThread().getName()); } return; } handler.received(channel, message); }
該方法是就是在handler處理消息上增長了處理心跳消息的功能,作到了功能加強。
該類跟Transporters的設計意圖是同樣的,Transporters我在《dubbo源碼解析(八)遠程通訊——開篇》的(十)Transporters已經講到了。Exchangers也用到了外觀模式。代碼爲就不貼了,能夠對照着Transporters來看,很簡單。
請求模型類,最重要的確定是模型的屬性,咱們來看看屬性:
/** * 心跳事件 */ public static final String HEARTBEAT_EVENT = null; /** * 只讀事件 */ public static final String READONLY_EVENT = "R"; /** * 請求編號自增序列 */ private static final AtomicLong INVOKE_ID = new AtomicLong(0); /** * 請求編號 */ private final long mId; /** * dubbo版本 */ private String mVersion; /** * 是否須要響應 */ private boolean mTwoWay = true; /** * 是不是事件 */ private boolean mEvent = false; /** * 是不是異常的請求 */ private boolean mBroken = false; /** * 請求數據 */ private Object mData;
響應模型,來看看它的屬性:
/** * 心跳事件 */ public static final String HEARTBEAT_EVENT = null; /** * 只讀事件 */ public static final String READONLY_EVENT = "R"; /** * ok. * 成功狀態碼 */ public static final byte OK = 20; /** * clien side timeout. * 客戶端側的超時狀態碼 */ public static final byte CLIENT_TIMEOUT = 30; /** * server side timeout. * 服務端側超時的狀態碼 */ public static final byte SERVER_TIMEOUT = 31; /** * channel inactive, directly return the unfinished requests. * 通道不活躍,返回未完成請求的狀態碼 */ public static final byte CHANNEL_INACTIVE = 35; /** * request format error. * 請求格式錯誤狀態碼 */ public static final byte BAD_REQUEST = 40; /** * response format error. * 響應格式錯誤狀態碼 */ public static final byte BAD_RESPONSE = 50; /** * service not found. * 服務找不到狀態碼 */ public static final byte SERVICE_NOT_FOUND = 60; /** * service error. * 服務錯誤狀態碼 */ public static final byte SERVICE_ERROR = 70; /** * internal server error. * 內部服務器錯誤狀態碼 */ public static final byte SERVER_ERROR = 80; /** * internal server error. * 客戶端錯誤狀態碼 */ public static final byte CLIENT_ERROR = 90; /** * server side threadpool exhausted and quick return. * 服務器端線程池耗盡並快速返回狀態碼 */ public static final byte SERVER_THREADPOOL_EXHAUSTED_ERROR = 100; /** * 響應編號 */ private long mId = 0; /** * dubbo 版本 */ private String mVersion; /** * 狀態 */ private byte mStatus = OK; /** * 是不是事件 */ private boolean mEvent = false; /** * 錯誤信息 */ private String mErrorMsg; /** * 返回結果 */ private Object mResult;
不少屬性跟Request模型的屬性同樣,而且含義也同樣,不過該模型多了不少的狀態碼。關鍵的是id跟請求一一對應。
public interface ResponseCallback { /** * done. * 處理請求 * @param response */ void done(Object response); /** * caught exception. * 處理異常 * @param exception */ void caught(Throwable exception); }
該接口是回調的接口,定義了兩個方法,分別是處理正常的響應結果和處理異常。
該類繼承了TelnetCodec,是信息交換編解碼器。在本文的開頭,我就寫到,dubbo將一條消息分紅了協議頭和協議體,用來解決粘包拆包問題,可是頭跟體在編解碼上有區別,咱們先來看看dubbo 的協議頭的配置:
上圖是官方文檔的圖片,可以清晰的看出協議中各個數據所佔的位數:
能夠看到一個該協議中前65位是協議頭,後面的都是協議體數據。那麼在編解碼中,協議頭是經過 Codec 編解碼,而body部分是用Serialization序列化和反序列化的。下面咱們就來看看該類對協議頭的編解碼。
// header length. /** * 協議頭長度:16字節 = 128Bits */ protected static final int HEADER_LENGTH = 16; // magic header. /** * MAGIC二進制:1101101010111011,十進制:55995 */ protected static final short MAGIC = (short) 0xdabb; /** * Magic High,也就是0-7位:11011010 */ protected static final byte MAGIC_HIGH = Bytes.short2bytes(MAGIC)[0]; /** * Magic Low 8-15位 :10111011 */ protected static final byte MAGIC_LOW = Bytes.short2bytes(MAGIC)[1]; // message flag. /** * 128 二進制:10000000 */ protected static final byte FLAG_REQUEST = (byte) 0x80; /** * 64 二進制:1000000 */ protected static final byte FLAG_TWOWAY = (byte) 0x40; /** * 32 二進制:100000 */ protected static final byte FLAG_EVENT = (byte) 0x20; /** * 31 二進制:11111 */ protected static final int SERIALIZATION_MASK = 0x1f;
能夠看到 MAGIC是個固定的值,用來判斷是否是dubbo協議的數據包,而且MAGIC_LOW和MAGIC_HIGH分別是MAGIC的低位和高位。其餘的屬性用來幹嗎後面會講到。
@Override public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException { if (msg instanceof Request) { // 若是消息是Request類型,對請求消息編碼 encodeRequest(channel, buffer, (Request) msg); } else if (msg instanceof Response) { // 若是消息是Response類型,對響應消息編碼 encodeResponse(channel, buffer, (Response) msg); } else { // 直接讓父類( Telnet ) 處理,目前是 Telnet 命令的結果。 super.encode(channel, buffer, msg); } }
該方法是根據消息的類型來分別進行編碼,分爲三種狀況:Request類型、Response類型以及其餘
protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException { Serialization serialization = getSerialization(channel); // header. // 建立16字節的字節數組 byte[] header = new byte[HEADER_LENGTH]; // set magic number. // 設置前16位數據,也就是設置header[0]和header[1]的數據爲Magic High和Magic Low Bytes.short2bytes(MAGIC, header); // set request and serialization flag. // 16-23位爲serialization編號,用到或運算10000000|serialization編號,例如serialization編號爲11111,則爲00011111 header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId()); // 繼續上面的例子,00011111|1000000 = 01011111 if (req.isTwoWay()) header[2] |= FLAG_TWOWAY; // 繼續上面的例子,01011111|100000 = 011 11111 能夠看到011表明請求標記、雙向、是事件,這樣就設置了1六、1七、18位,後面19-23位是Serialization 編號 if (req.isEvent()) header[2] |= FLAG_EVENT; // set request id. // 設置32-95位請求id Bytes.long2bytes(req.getId(), header, 4); // encode request data. // // 編碼 `Request.data` 到 Body ,並寫入到 Buffer int savedWriteIndex = buffer.writerIndex(); buffer.writerIndex(savedWriteIndex + HEADER_LENGTH); ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer); // 對body數據序列化 ObjectOutput out = serialization.serialize(channel.getUrl(), bos); // 若是該請求是事件 if (req.isEvent()) { // 特殊事件編碼 encodeEventData(channel, out, req.getData()); } else { // 正常請求編碼 encodeRequestData(channel, out, req.getData(), req.getVersion()); } // 釋放資源 out.flushBuffer(); if (out instanceof Cleanable) { ((Cleanable) out).cleanup(); } bos.flush(); bos.close(); int len = bos.writtenBytes(); //檢驗消息長度 checkPayload(channel, len); // 設置96-127位:Body值 Bytes.int2bytes(len, header, 12); // write // 把header寫入到buffer buffer.writerIndex(savedWriteIndex); buffer.writeBytes(header); // write header. buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len); }
該方法是對Request類型的消息進行編碼,仔細閱讀上述我寫的註解,結合協議頭各個位數的含義,好好品味我舉的例子。享受二進制位運算帶來的快樂,也能夠看到前半部分邏輯是對協議頭的編碼,後面還有對body值的序列化。
protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException { Serialization serialization = getSerialization(channel); // header. // 建立16字節的字節數組 byte[] header = new byte[HEADER_LENGTH]; // set magic number. // 設置前16位數據,也就是設置header[0]和header[1]的數據爲Magic High和Magic Low Bytes.short2bytes(MAGIC, header); // set request and serialization flag. // 16-23位爲serialization編號,用到或運算10000000|serialization編號,例如serialization編號爲11111,則爲00011111 header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId()); // 繼續上面的例子,00011111|1000000 = 01011111 if (req.isTwoWay()) header[2] |= FLAG_TWOWAY; // 繼續上面的例子,01011111|100000 = 011 11111 能夠看到011表明請求標記、雙向、是事件,這樣就設置了1六、1七、18位,後面19-23位是Serialization 編號 if (req.isEvent()) header[2] |= FLAG_EVENT; // set request id. // 設置32-95位請求id Bytes.long2bytes(req.getId(), header, 4); // encode request data. // // 編碼 `Request.data` 到 Body ,並寫入到 Buffer int savedWriteIndex = buffer.writerIndex(); buffer.writerIndex(savedWriteIndex + HEADER_LENGTH); ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer); // 對body數據序列化 ObjectOutput out = serialization.serialize(channel.getUrl(), bos); // 若是該請求是事件 if (req.isEvent()) { // 特殊事件編碼 encodeEventData(channel, out, req.getData()); } else { // 正常請求編碼 encodeRequestData(channel, out, req.getData(), req.getVersion()); } // 釋放資源 out.flushBuffer(); if (out instanceof Cleanable) { ((Cleanable) out).cleanup(); } bos.flush(); bos.close(); int len = bos.writtenBytes(); //檢驗消息長度 checkPayload(channel, len); // 設置96-127位:Body值 Bytes.int2bytes(len, header, 12); // write // 把header寫入到buffer buffer.writerIndex(savedWriteIndex); buffer.writeBytes(header); // write header. buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len); } protected void encodeResponse(Channel channel, ChannelBuffer buffer, Response res) throws IOException { int savedWriteIndex = buffer.writerIndex(); try { Serialization serialization = getSerialization(channel); // header. // 建立16字節大小的字節數組 byte[] header = new byte[HEADER_LENGTH]; // set magic number. // 設置前0-15位爲魔數 Bytes.short2bytes(MAGIC, header); // set request and serialization flag. // 設置響應標誌和序列化id header[2] = serialization.getContentTypeId(); // 若是是心跳事件,則設置第18位爲事件 if (res.isHeartbeat()) header[2] |= FLAG_EVENT; // set response status. // 設置24-31位爲狀態碼 byte status = res.getStatus(); header[3] = status; // set request id. // 設置32-95位爲請求id Bytes.long2bytes(res.getId(), header, 4); // 寫入數據 buffer.writerIndex(savedWriteIndex + HEADER_LENGTH); ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer); // 對body進行序列化 ObjectOutput out = serialization.serialize(channel.getUrl(), bos); // encode response data or error message. if (status == Response.OK) { if (res.isHeartbeat()) { // 對心跳事件編碼 encodeHeartbeatData(channel, out, res.getResult()); } else { // 對普通響應編碼 encodeResponseData(channel, out, res.getResult(), res.getVersion()); } } else out.writeUTF(res.getErrorMessage()); // 釋放 out.flushBuffer(); if (out instanceof Cleanable) { ((Cleanable) out).cleanup(); } bos.flush(); bos.close(); int len = bos.writtenBytes(); checkPayload(channel, len); Bytes.int2bytes(len, header, 12); // write buffer.writerIndex(savedWriteIndex); buffer.writeBytes(header); // write header. buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len); } catch (Throwable t) { // clear buffer buffer.writerIndex(savedWriteIndex); // send error message to Consumer, otherwise, Consumer will wait till timeout. //若是在寫入數據失敗,則返回響應格式錯誤的返回碼 if (!res.isEvent() && res.getStatus() != Response.BAD_RESPONSE) { Response r = new Response(res.getId(), res.getVersion()); r.setStatus(Response.BAD_RESPONSE); if (t instanceof ExceedPayloadLimitException) { logger.warn(t.getMessage(), t); try { r.setErrorMessage(t.getMessage()); // 發送響應 channel.send(r); return; } catch (RemotingException e) { logger.warn("Failed to send bad_response info back: " + t.getMessage() + ", cause: " + e.getMessage(), e); } } else { // FIXME log error message in Codec and handle in caught() of IoHanndler? logger.warn("Fail to encode response: " + res + ", send bad_response info instead, cause: " + t.getMessage(), t); try { r.setErrorMessage("Failed to send response: " + res + ", cause: " + StringUtils.toString(t)); channel.send(r); return; } catch (RemotingException e) { logger.warn("Failed to send bad_response info back: " + res + ", cause: " + e.getMessage(), e); } } } // Rethrow exception if (t instanceof IOException) { throw (IOException) t; } else if (t instanceof RuntimeException) { throw (RuntimeException) t; } else if (t instanceof Error) { throw (Error) t; } else { throw new RuntimeException(t.getMessage(), t); } } }
該方法是對Response類型的消息進行編碼,該方法裏面我沒有舉例子演示如何進行編碼,不過過程跟encodeRequest相似。
@Override public Object decode(Channel channel, ChannelBuffer buffer) throws IOException { int readable = buffer.readableBytes(); // 讀取前16字節的協議頭數據,若是數據不滿16字節,則讀取所有 byte[] header = new byte[Math.min(readable, HEADER_LENGTH)]; buffer.readBytes(header); // 解碼 return decode(channel, buffer, readable, header); } @Override protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException { // check magic number. // 覈對魔數(該數字固定) if (readable > 0 && header[0] != MAGIC_HIGH || readable > 1 && header[1] != MAGIC_LOW) { int length = header.length; // 將 buffer 徹底複製到 `header` 數組中 if (header.length < readable) { header = Bytes.copyOf(header, readable); buffer.readBytes(header, length, readable - length); } for (int i = 1; i < header.length - 1; i++) { if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) { buffer.readerIndex(buffer.readerIndex() - header.length + i); header = Bytes.copyOf(header, i); break; } } return super.decode(channel, buffer, readable, header); } // check length. // Header 長度不夠,返回須要更多的輸入,解決拆包現象 if (readable < HEADER_LENGTH) { return DecodeResult.NEED_MORE_INPUT; } // get data length. int len = Bytes.bytes2int(header, 12); // 檢查信息頭長度 checkPayload(channel, len); int tt = len + HEADER_LENGTH; // 總長度不夠,返回須要更多的輸入,解決拆包現象 if (readable < tt) { return DecodeResult.NEED_MORE_INPUT; } // limit input stream. ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len); try { // 對body反序列化 return decodeBody(channel, is, header); } finally { // 若是不可用 if (is.available() > 0) { try { // 打印錯誤日誌 if (logger.isWarnEnabled()) { logger.warn("Skip input stream " + is.available()); } // 跳過未讀完的流 StreamUtils.skipUnusedStream(is); } catch (IOException e) { logger.warn(e.getMessage(), e); } } } }
該方法就是解碼前的一些覈對過程,包括檢測是否爲dubbo協議,是否有拆包現象等,具體的解碼在decodeBody方法。
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException { // 用並運算符 byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK); // get request id. // 得到請求id long id = Bytes.bytes2long(header, 4); // 若是第16位爲0,則說明是響應 if ((flag & FLAG_REQUEST) == 0) { // decode response. Response res = new Response(id); // 若是第18位不是0,則說明是心跳事件 if ((flag & FLAG_EVENT) != 0) { res.setEvent(Response.HEARTBEAT_EVENT); } // get status. byte status = header[3]; res.setStatus(status); try { ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto); // 若是響應是成功的 if (status == Response.OK) { Object data; if (res.isHeartbeat()) { // 若是是心跳事件,則心跳事件的解碼 data = decodeHeartbeatData(channel, in); } else if (res.isEvent()) { // 若是是事件,則事件的解碼 data = decodeEventData(channel, in); } else { // 不然執行普通解碼 data = decodeResponseData(channel, in, getRequestData(id)); } // 從新設置響應結果 res.setResult(data); } else { res.setErrorMessage(in.readUTF()); } } catch (Throwable t) { res.setStatus(Response.CLIENT_ERROR); res.setErrorMessage(StringUtils.toString(t)); } return res; } else { // decode request. // 對請求類型解碼 Request req = new Request(id); // 設置版本號 req.setVersion(Version.getProtocolVersion()); // 若是第17位不爲0,則是雙向 req.setTwoWay((flag & FLAG_TWOWAY) != 0); // 若是18位不爲0,則是心跳事件 if ((flag & FLAG_EVENT) != 0) { req.setEvent(Request.HEARTBEAT_EVENT); } try { // 反序列化 ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto); Object data; if (req.isHeartbeat()) { // 若是請求是心跳事件,則心跳事件解碼 data = decodeHeartbeatData(channel, in); } else if (req.isEvent()) { // 若是是事件,則事件解碼 data = decodeEventData(channel, in); } else { // 不然,用普通解碼 data = decodeRequestData(channel, in); } // 把從新設置請求數據 req.setData(data); } catch (Throwable t) { // bad request // 設置是異常請求 req.setBroken(true); req.setData(t); } return req; } }
該方法就是解碼的過程,而且對協議頭和協議體分開解碼,協議頭編碼是作或運算,而解碼則是作並運算,協議體用反序列化的方式解碼,一樣也是分爲了Request類型、Response類型進行解碼。
該部分相關的源碼解析地址: https://github.com/CrazyHZM/i...
該文章講解了Exchange層的相關設計和邏輯、介紹dubbo-remoting-api中的exchange包內的源碼解,其中關鍵的是設計了Request和Response模型,整個信息交換都圍繞這兩大模型,而且設計了dubbo協議,解決拆包粘包問題,在信息交換中協議頭攜帶的信息起到了關鍵做用,也知足了rpc調用的一些需求。下一篇我會講解遠程通訊的buffer部分。若是我在哪一部分寫的不夠到位或者寫錯了,歡迎給我提意見,個人私人微信號碼:HUA799695226。