dubbo相關說明(官方):bootstrap
在RPC中,Protocol是核心層,也就是隻要有Protocol + Invoker + Exporter就能夠完成非透明的RPC調用,而後在Invoker的主過程上Filter攔截點。api
圖中的Consumer和Provider是抽象概念,只是想讓看圖者更直觀的瞭解哪些類分屬於客戶端與服務器端,不用Client和Server的緣由是Dubbo在不少場景下都使用Provider, Consumer, Registry, Monitor劃分邏輯拓普節點,保持統一律念。服務器
而Cluster是外圍概念,因此Cluster的目的是將多個Invoker假裝成一個Invoker,這樣其它人只要關注Protocol層Invoker便可,加上Cluster或者去掉Cluster對其它層都不會形成影響,由於只有一個提供者時,是不須要Cluster的。app
Proxy層封裝了全部接口的透明化代理,而在其它層都以Invoker爲中心,只有到了暴露給用戶使用時,才用Proxy將Invoker轉成接口,或將接口實現轉成Invoker,也就是去掉Proxy層RPC是能夠Run的,只是不那麼透明,不那麼看起來像調本地服務同樣調遠程服務。socket
而Remoting實現是Dubbo協議的實現,若是你選擇RMI協議,整個Remoting都不會用上,Remoting內部再劃爲Transport傳輸層和Exchange信息交換層,Transport層只負責單向消息傳輸,是對Mina,Netty,Grizzly的抽象,它也能夠擴展UDP傳輸,而Exchange層是在傳輸層之上封裝了Request-Response語義。tcp
dubbo遠程調用詳細過程,官方中有個大概的流程,根據本身的理解跟蹤畫出下面的調用鏈(默認使用dubbo協議):ide
調用鏈經過一系列的Invoker和filter,最終經過netty實現遠程通訊。ui
其中:this
經過LazyConnectExchangeClient.request()中調用initClient()對NettyClient進行初始化;編碼
private void initClient() throws RemotingException {
if (client != null )
return;
if (logger.isInfoEnabled()) {
logger.info("Lazy connect to " + url);
}
connectLock.lock();
try {
if (client != null)
return;
//建立鏈接
this.client = Exchangers.connect(url, requestHandler);
} finally {
connectLock.unlock();
}
}
經過NettyClient建立鏈接:
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
bootstrap = new ClientBootstrap(channelFactory);
// config
// @see org.jboss.netty.channel.socket.SocketChannelConfig
bootstrap.setOption("keepAlive", true);
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("connectTimeoutMillis", getTimeout());
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
ChannelPipeline pipeline = Channels.pipeline();
//添加編解碼器,對消息進行編碼、解碼
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
//添加消息接收、發送處理
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
}
adapter.getEncoder():InternalEncoder:
private class InternalEncoder extends OneToOneEncoder { @Override protected Object encode(ChannelHandlerContext ctx, Channel ch, Object msg) throws Exception { com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.dynamicBuffer(1024); NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler); try { //編碼處理 codec.encode(channel, buffer, msg); } finally { NettyChannel.removeChannelIfDisconnected(ch); } return ChannelBuffers.wrappedBuffer(buffer.toByteBuffer()); } }
ExchangeCodec:
public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
//對請求參數編碼
if (msg instanceof Request) {
encodeRequest(channel, buffer, (Request) msg);
} else if (msg instanceof Response) {
encodeResponse(channel, buffer, (Response) msg);
} else {
super.encode(channel, buffer, msg);
}
}
encodeRequest():
protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException { Serialization serialization = getSerialization(channel); // header. 協議頭 byte[] header = new byte[HEADER_LENGTH]; // set magic number. 添加2字節的魔數 Bytes.short2bytes(MAGIC, header); //添加序1個字節的列化標識 // set request and serialization flag. header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId()); if (req.isTwoWay()) header[2] |= FLAG_TWOWAY; if (req.isEvent()) header[2] |= FLAG_EVENT; // set request id. 添加8字節的請求惟一id Bytes.long2bytes(req.getId(), header, 4); // encode request data. 對請求數據進行encode int savedWriteIndex = buffer.writerIndex(); buffer.writerIndex(savedWriteIndex + HEADER_LENGTH); ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer); ObjectOutput out = serialization.serialize(channel.getUrl(), bos); if (req.isEvent()) { encodeEventData(channel, out, req.getData()); } else { //經過子類DubboCodec編碼請求 encodeRequestData(channel, out, req.getData()); } out.flushBuffer(); bos.flush(); bos.close(); int len = bos.writtenBytes(); checkPayload(channel, len); Bytes.int2bytes(len, header, 12); // write 寫入buffer中(請求起始地址、結束地址以及header) buffer.writerIndex(savedWriteIndex); buffer.writeBytes(header); // write header. buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len); }
DubboCodec.encodeRequestData()發送消息:
protected void encodeRequestData(Channel channel, ObjectOutput out, Object data) throws IOException { RpcInvocation inv = (RpcInvocation) data; //請求消息中包含dubbo版本號、接口、參數等信息 out.writeUTF(inv.getAttachment(Constants.DUBBO_VERSION_KEY, DUBBO_VERSION)); out.writeUTF(inv.getAttachment(Constants.PATH_KEY)); out.writeUTF(inv.getAttachment(Constants.VERSION_KEY)); out.writeUTF(inv.getMethodName()); out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes())); Object[] args = inv.getArguments(); if (args != null) for (int i = 0; i < args.length; i++){ out.writeObject(encodeInvocationArgument(channel, inv, i)); } out.writeObject(inv.getAttachments()); }
消息數據包含dubbo版本號、接口名稱、、方法名稱、參數類等信息,將它們序列化後寫入到類型到buffer中。
經過接收到的請求數據,進行decode,解碼完成後經過調用連進入NettyHandler.messageReceived()進行處理接收到的消息=>HeaderExchangeHandler.received()
public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
//請求消息
if (message instanceof Request) {
// handle request.
Request request = (Request) message;
if (request.isEvent()) {
handlerEvent(channel, request);
} else {
if (request.isTwoWay()) {
//處理請求
Response response = handleRequest(exchangeChannel, request);
channel.send(response);
} else {
handler.received(exchangeChannel, request.getData());
}
}
} else if (message instanceof Response) {
//......
}
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
handleRequest()方法:
Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
Response res = new Response(req.getId(), req.getVersion());
//處理失敗的消息
if (req.isBroken()) {
//.........
return res;
}
// find handler by message class.
Object msg = req.getData();
try {
// handle data. 服務端處理接收到的消息,handler爲DubboProtocol的內部實現
Object result = handler.reply(channel, msg);
res.setStatus(Response.OK);
res.setResult(result);
} catch (Throwable e) {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
}
return res;
}
DubboProtocol.requestHandler:
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
Invocation inv = (Invocation) message;
//獲取invoker
Invoker<?> invoker = getInvoker(channel, inv);
//若是是callback 須要處理高版本調用低版本的問題
if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))){
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null || methodsStr.indexOf(",") == -1){
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(",");
for (String method : methods){
if (inv.getMethodName().equals(method)){
hasMethod = true;
break;
}
}
}
if (!hasMethod){
logger.warn(new IllegalStateException("The methodName "+inv.getMethodName()+" not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) +" ,invocation is :"+inv );
return null;
}
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
//經過invoke調用目標方法
return invoker.invoke(inv);
}
throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}
經過invoke調用目標方法,並最終執行接口相關邏輯;並把結果封裝爲response返回給客戶端(詳見HeaderExchangeHandler.received()中channel.send(response)方法)。
同理:返回給客戶端的reponse對象也會通過編碼encode:包括魔數、序列化協議、響應碼、requestId等。
服務端給客戶端返回數據以後,客戶端會收到IO事件,NettyClient對響應數據進行解碼(即解析requestId、響應碼、序列化協議、響應數據等);解碼完成後經過client中綁定的NettyHandler調用其received()方法處理(相似服務端解析邏輯)。
NettyHandler:
public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
if (message instanceof Request) {
//........處理請求request——服務端
} else if (message instanceof Response) {
//響應response——服務消費端
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
if (isClientSide(channel)) {
Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
logger.error(e.getMessage(), e);
} else {
String echo = handler.telnet(channel, (String) message);
if (echo != null && echo.length() > 0) {
channel.send(echo);
}
}
} else {
handler.received(exchangeChannel, message);
}
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
handleResponse(channel, (Response) message):
static void handleResponse(Channel channel, Response response) throws RemotingException { //響應內容不爲空而且不是心跳檢測的請求響應 if (response != null && !response.isHeartbeat()) { DefaultFuture.received(channel, response); } }
DefaultFuture:
public static void received(Channel channel, Response response) {
try {
//移除當前future
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
//喚醒等待響應結果的線程
future.doReceived(response);
} else {
//.....
}
} finally {
CHANNELS.remove(response.getId());
}
}
private void doReceived(Response res) {
lock.lock();
try {
response = res;
if (done != null) {
//喚醒等待的線程
done.signal();
}
} finally {
lock.unlock();
}
if (callback != null) {
invokeCallback(callback);
}
}
調用netty發送數據後,該請求線程一直DefaultFuture.await()等待響應。
注:經過綁定的NettyServer:NettyHandler.messageReceived()——>HeaderExchangeHandler喚醒DefaultFuture後續處理