先推薦你們閱讀dubbo官網源碼解讀服務調用流程
一節,傳送門: dubbo.apache.org/zh-cn/docs/…html
Dubbo同步調用仍是異步調用的邏輯是在DubboInvoker中,Dubbo 實現同步和異步調用比較關鍵的一點就在於由誰調用 ResponseFuture 的 get
方法。前端
ResponseFuture是一個接口,ResponseFuture的默認實現是DefaultFuture,當服務消費者還未接收到調用結果時,用戶線程調用 get 方法會被阻塞住。apache
通常狀況下,服務調用方會併發調用多個服務,每一個用戶線程發送請求後,會調用不一樣 DefaultFuture 對象的 get 方法進行等待。 一段時間後,服務調用方的線程池會收到多個響應對象。這個時候要考慮一個問題,如何將每一個響應對象傳遞給相應的 DefaultFuture 對象,且不出錯。答案是經過調用編號。DefaultFuture 被建立時,會要求傳入一個 Request 對象。此時 DefaultFuture 可從 Request 對象中獲取調用編號,並將 <調用編號, DefaultFuture 對象> 映射關係存入到靜態 Map 中,即 FUTURES。線程池中的線程在收到 Response 對象後,會根據 Response 對象中的調用編號到 FUTURES 集合中取出相應的 DefaultFuture 對象,而後再將 Response 對象設置到 DefaultFuture 對象中。最後再喚醒用戶線程,這樣用戶線程便可從 DefaultFuture 對象中獲取調用結果了。bash
DefaultFuture
中的sent變量在客戶端向服務端發送請求成功後會寫入,以代表消息發送完成。併發
-->NettyChannel#send
-->io.netty.channel.ChannelOutboundInvoker#writeAndFlush
-->NettyClientHandler#write
-->.DefaultFuture#sent
複製代碼
若是客戶端沒有成功發送消息,服務端不會返回響應(Response),DefaultFuture
中的sent
變量也沒有被寫入,在DefaultFuture#getTimeoutMessage
會根據sent
是否大於0,輸出客戶端超時異常。框架
若是客戶端成功發送消息,服務端返回響應(Response),DefaultFuture
中的sent
變量被寫入,在DefaultFuture#getTimeoutMessage
會根據sent
是否大於0,服務端超時異常。異步
private String getTimeoutMessage(boolean scan) {
long nowTimestamp = System.currentTimeMillis();
return (sent > 0 ? "Waiting server-side response timeout" : "Sending request timeout in client-side")
+ (scan ? " by scan timer" : "") + ". start time: "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date(start))) + ", end time: "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) + ","
+ (sent > 0 ? " client elapsed: " + (sent - start)
+ " ms, server elapsed: " + (nowTimestamp - sent)
: " elapsed: " + (nowTimestamp - start)) + " ms, timeout: "
+ timeout + " ms, request: " + request + ", channel: " + channel.getLocalAddress()
+ " -> " + channel.getRemoteAddress();
}
複製代碼
當發生超時異常的時候是沒有Response返回的,dubbo的客戶端在建立DefaultFuture的時候會建立一個TimeoutCheckTask
的延時任務,當超時時間到達後就會執行。這段代碼不難理解。ide
public static DefaultFuture newFuture(Channel channel, Request request, int timeout) {
final DefaultFuture future = new DefaultFuture(channel, request, timeout);
// timeout check
timeoutCheck(future);
return future;
}
private static class TimeoutCheckTask implements TimerTask {
private DefaultFuture future;
TimeoutCheckTask(DefaultFuture future) {
this.future = future;
}
@Override
public void run(Timeout timeout) {
if (future == null || future.isDone()) {
return;
}
// create exception response.
Response timeoutResponse = new Response(future.getId());
// set timeout status.
timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);
timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));
// handle response.
DefaultFuture.received(future.getChannel(), timeoutResponse);
}
}
private Object returnFromResponse() throws RemotingException {
Response res = response;
if (res == null) {
throw new IllegalStateException("response cannot be null");
}
// 若是響應正常則返回調用結果
if (res.getStatus() == Response.OK) {
return res.getResult();
}
// 拋出超時異常
if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage());
}
// 消費方調用異常拋出RemotingException
throw new RemotingException(channel, res.getErrorMessage());
}
複製代碼
使用延時任務的方式會在調用超時的時候也會使RPC調用流程完整,而不至於一直停留在!isDone()
狀態,相對來講這種方式可能更好一些。ui