dubbo之RpcContext

dubbo之RpcContext

RpcContext 是一個 ThreadLocal 的臨時狀態記錄器,當接收到 RPC 請求,或發起 RPC 請求時,RpcContext 的狀態都會變化。好比:A 調 B,B 再調 C,則 B 機器上,在 B 調 C 以前,RpcContext 記錄的是 A 調 B 的信息,在 B 調 C 以後,RpcContext 記錄的是 B 調 C 的信息。app

使用

消費端

// 遠程調用以前,經過attachment傳KV給提供方
RpcContext.getContext().setAttachment("userKey", "userValue");
// 遠程調用
xxxService.xxx();
// 本端是否爲消費端,這裏會返回true
boolean isConsumerSide = RpcContext.getContext().isConsumerSide();
// 獲取最後一次調用的提供方IP地址
String serverIP = RpcContext.getContext().getRemoteHost();
// 獲取當前服務配置信息,全部配置信息都將轉換爲URL的參數
String application = RpcContext.getContext().getUrl().getParameter("application");
// 注意:每發起RPC調用,上下文狀態會變化
yyyService.yyy();
// 此時 RpcContext 的狀態已變化 
RpcContext.getContext();

提供方

public class XxxServiceImpl implements XxxService {

    public void xxx() {
        // 經過RpcContext獲取用戶傳參,這裏會返回userValue
        String value = RpcContext.getContext().getAttachment("userKey");
        // 本端是否爲提供端,這裏會返回true
        boolean isProviderSide = RpcContext.getContext().isProviderSide();
        // 獲取調用方IP地址
        String clientIP = RpcContext.getContext().getRemoteHost();
        // 獲取當前服務配置信息,全部配置信息都將轉換爲URL的參數
        String application = RpcContext.getContext().getUrl().getParameter("application");
        // 注意:每發起RPC調用,上下文狀態會變化
        yyyService.yyy();
        // 此時本端變成消費端,這裏會返回false
        boolean isProviderSide = RpcContext.getContext().isProviderSide();
    } 
}

異步調用

//使用RpcContext 來實現異步調用
Future<T> future = RpcContext.getContext().asyncCall(new Callable<T>(){...});

//獲取異步調用的結果,經過Future.
Future<T> future = RpcContext.getContext().getFuture();

原理

消費端在執行Rpc調用以前,通過Filter處理, 會將信息寫入RpcContext.
ConsumerContextFilter:異步

RpcContext.getContext()
        setInvoker(invoker)
        .setInvocation(invocation)
        .setLocalAddress(NetUtils.getLocalHost(), 0)
        .setRemoteAddress(invoker.getUrl().getHost(), invoker.getUrl().getPort());

服務端在執行調用以前,也會通過Filter處理,將信息寫入RpcContext. 見ContextFilterasync

RpcContext.getContext()
        .setInvoker(invoker)
        .setInvocation(invocation)
        .setAttachments(attachments) //attachment 來自於RpcInvocation中的attachment
        .setLocalAddress(invoker.getUrl().getHost(), invoker.getUrl().getPort());

provider端的remoteAddress是從tcp鏈接中獲取的,見DubboProtocol類:tcp

RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());

所以每發起RPC調用,上下文狀態會變化。ide

消費者在執行調用以前(AbstractInvoker),會將RpcContext中的內容寫入到invocation,實現參數傳遞url

Map<String, String> context = RpcContext.getContext().getAttachments();
if (context != null) {
    invocation.addAttachmentsIfAbsent(context);  //只新增,不修改
}

在異步實現中,將async參數設置爲true, 也會傳遞到提供者來實現異步調用。code

setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());

DubboInvoker在執行調用的時候,會判斷ASYNC_KEY是否爲true,
若是是,則會向context中寫入future對象:server

...
else if (isAsync) {
    ResponseFuture future = currentClient.request(inv, timeout) ;
    RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
    return new RpcResult();
}
...

不等待調用結果返回。以實現消費端的異步調用。對象

判斷是否爲Provider或者Consumer則是經過remoteAdress和localAddress作對比來實現的。
源碼以下:rem

// address爲remoteAddress,在調用以前Filter中寫入
String host;
if (address.getAddress() == null) {
    host = address.getHostName();
} else {
    host = address.getAddress().getHostAddress();
}
return url.getPort() == address.getPort() && 
        NetUtils.filterLocalHost(url.getIp()).equals(NetUtils.filterLocalHost(NetUtils.getIpByHost(host)));
相關文章
相關標籤/搜索