因爲rpc底層涉及網絡編程接口,線程模型,網絡數據結構,服務協議,細到字節的處理。牽涉內容較多,今天就先從一個點提及。
說說,dubbo經過netty框架作傳輸層,從接到數據字節流到把字節轉換爲dubbo上層可讀的Request消息對象的過程。當前dubbo還支持mina,grizzly作底層傳輸層。
這裏包括兩部,反序列化和解碼。我打算分兩篇寫。這篇主要是說解碼的過程。
就是下面這個dubbo架構圖中,紅框中的部分。java
既然是netty作傳輸層,netty的基礎得提一點。
netty框架是經過管道(ChannelPipeline)模型處理網絡數據流的,每一個管道中有多個處理接點(ChannelHandler),
節點分爲,進站(client請求進服務端口)和出站(請求響應出服務端口)兩種。好比一個進站消息老是,順序的(順序是程序中編碼指定的)經過進站處理節點。
同理出站消息,老是順序的經過出站節點到達網絡接口。編程
dubbo2.5.6版本,傳輸層dubbo提供有netty3和netty4兩種實現,初始化netty通道都在NettyServer類裏,兩個類同名,包名不一樣。bootstrap
具體,netty3在NettyServer類裏doOpen()方法:數組
protected void doOpen() throws Throwable { NettyHelper.setNettyLoggerFactory(); ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true)); ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true)); ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS)); bootstrap = new ServerBootstrap(channelFactory); final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); channels = nettyHandler.getChannels(); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { //編解碼器的初始化 NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); ChannelPipeline pipeline = Channels.pipeline(); /*int idleTimeout = getIdleTimeout(); if (idleTimeout > 10000) { pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0)); }*/ pipeline.addLast("decoder", adapter.getDecoder());//設置解碼hander pipeline.addLast("encoder", adapter.getEncoder());//設置編碼hander pipeline.addLast("handler", nettyHandler);//自定義NettyHandler 擴展自netty雙向handler基類,能夠接受進站和出站數據流 return pipeline; //進站的請求,先通過adapter.getDecoder()handler處理,再由nettyHandler處理 //出站的請求,先通過nettyHandler處理 再由adapter.getEncoder()handler處理 } }); // bind channel = bootstrap.bind(getBindAddress()); }
netty4版本NettyServer類裏doOpen()方法:網絡
protected void doOpen() throws Throwable { NettyHelper.setNettyLoggerFactory(); bootstrap = new ServerBootstrap(); //acceptor 事件循環線程 bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true)); //client channel事件循環線程 workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS), new DefaultThreadFactory("NettyServerWorker", true)); final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this); channels = nettyServerHandler.getChannels(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE) .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { //編解碼器的初始化 NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug .addLast("decoder", adapter.getDecoder())//設置解碼hander .addLast("encoder", adapter.getEncoder())//設置編碼hander .addLast("handler", nettyServerHandler);//自定義NettyServerHandler 擴展netty雙向handler基類,能夠接受進站和出站數據流 //進站的請求,先通過adapter.getDecoder()handler處理,再由nettyServerHandler處理 //出站的請求,先通過nettyServerHandler處理 再由adapter.getEncoder()handler處理 } }); // bind ChannelFuture channelFuture = bootstrap.bind(getBindAddress()); channelFuture.syncUninterruptibly(); channel = channelFuture.channel(); }
本次說的是進站流程,請求數據解析成Request對象過程,經過上面的代碼和netty特性可知,
進站數據先經過解碼hander,經解碼成消息Request對象後,再到自定義handler,而後由自定義hanlder經過裝飾模式,調用實際服務。數據結構
先看下解碼handler的實現:
由adapter.getDecoder()這句跟蹤到NettyCodecAdapter類的getDecoder()方法
public ChannelHandler getDecoder() {
return decoder;
}
能夠看到,這個方法獲取的解碼handler,decoder,是NettyCodecAdapter類的私有屬性架構
private final ChannelHandler decoder = new InternalDecoder();併發
看下InternalDecoder類定義,netty3版本:app
* 這裏須要些netty的知識,繼承SimpleChannelUpstreamHandler,代表它是進站handler * 因此進站的數據流,都會通過本handler對象,具體就是messageReceived方法。 */ private class InternalDecoder extends SimpleChannelUpstreamHandler { private com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;//這是dubbo根據nio本身實現的buffer @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception { Object o = event.getMessage(); if (!(o instanceof ChannelBuffer)) {//這裏ChnnelBuffer是netty基於jdk nio ByteBuffer 實現 ctx.sendUpstream(event); return; } ChannelBuffer input = (ChannelBuffer) o; int readable = input.readableBytes();//到這就是從netty event對象取數據的過程。 if (readable <= 0) { return; } com.alibaba.dubbo.remoting.buffer.ChannelBuffer message; if (buffer.readable()) { if (buffer instanceof DynamicChannelBuffer) { buffer.writeBytes(input.toByteBuffer()); message = buffer; } else { int size = buffer.readableBytes() + input.readableBytes(); message = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.dynamicBuffer( size > bufferSize ? size : bufferSize);//bufferSize 不指定是8k,這裏表示最小8K message.writeBytes(buffer, buffer.readableBytes()); message.writeBytes(input.toByteBuffer());//把netty 讀到的字節流寫入message } } else {//直接經過構造器,構造message message = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.wrappedBuffer( input.toByteBuffer()); } NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler); Object msg; int saveReaderIndex; //從netty框架,讀取到的數據,放入message後,下面就是針對message的反序列化和解碼過程。 try { // decode object. do { saveReaderIndex = message.readerIndex(); try { //解碼,這裏麪包括的反序列化 msg = codec.decode(channel, message);//重要!!!經過具體編解碼實例codec完成解碼 } catch (IOException e) { buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER; throw e; } if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {//若是解碼結果是,Codec2.DecodeResult.NEED_MORE_INPUT,表示,須要更多數據 message.readerIndex(saveReaderIndex);//很重要,設置readerIndex爲,解碼讀取前的位置,爲了下次再從頭讀取。 break; } else { if (saveReaderIndex == message.readerIndex()) { buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER; throw new IOException("Decode without read data."); } if (msg != null) {//解碼完成,這裏的msg已是Request對象。!! Channels.fireMessageReceived(ctx, msg, event.getRemoteAddress()); } } } while (message.readable()); } finally { if (message.readable()) { message.discardReadBytes(); buffer = message; } else { buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER; } NettyChannel.removeChannelIfDisconnected(ctx.getChannel()); } } //處理異常 @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { ctx.sendUpstream(e);//向下一個傳遞。通常在最後一個handler處理異常 } }
netty4版本:框架
/*** * netty4 擴展了ByteToMessageDecoder * 重寫decode 方法,解碼完成後,不用像netty3手動Channels.fireMessageReceived 發送事件, * netty4自動把對象,傳遞到下一個handler */ private class InternalDecoder extends ByteToMessageDecoder { protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception { ChannelBuffer message = new NettyBackedChannelBuffer(input); NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); Object msg; int saveReaderIndex; try { // decode object. do { //保存初始,讀取位置 saveReaderIndex = message.readerIndex(); try { msg = codec.decode(channel, message);//重要!!!經過具體編解碼實例codec完成解碼 } catch (IOException e) { throw e; } if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) { //解碼失敗後,從新設置readerIndex爲讀取前位置 message.readerIndex(saveReaderIndex); break; } else { //is it possible to go here ? if (saveReaderIndex == message.readerIndex()) { throw new IOException("Decode without read data."); } if (msg != null) { //解碼成功後,加入到out list中,傳遞到下一個處理handler out.add(msg); } } } while (message.readable()); } finally { NettyChannel.removeChannelIfDisconnected(ctx.channel()); } } }
能夠看到,不管netty3,仍是netty4都是經過,NettyCodecAdapter的codec屬性完成解碼的,
這裏有個概念,編解碼handler是經過編解碼實例完成編解碼的,這裏的編解碼實例就是codec
而codec實例是由它構造函數從上層方法傳遞的。以下
public NettyCodecAdapter(Codec2 codec, URL url, com.alibaba.dubbo.remoting.ChannelHandler handler) { this.codec = codec; this.url = url; this.handler = handler; int b = url.getPositiveParameter(Constants.BUFFER_KEY, Constants.DEFAULT_BUFFER_SIZE); this.bufferSize = b >= Constants.MIN_BUFFER_SIZE && b <= Constants.MAX_BUFFER_SIZE ? b : Constants.DEFAULT_BUFFER_SIZE; }
再回到NettySever類中NettyCodecAdapter的構造語句
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
跟進getCodec()方法,這個方就是獲取實際編解碼方案的。這個方法的實如今NettyServer的祖先類AbstractEndpoint中:
protected Codec2 getCodec() { return codec; } //能夠看到codec在構造方法裏建立的 public AbstractEndpoint(URL url, ChannelHandler handler) { super(url, handler); this.codec = getChannelCodec(url);//根據url配置,構造編碼解碼器(經過spi獲得DubboCountCodec類實例) this.timeout = url.getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); this.connectTimeout = url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT); } // 再跟到getChannelCodec方法 protected static Codec2 getChannelCodec(URL url) { //經過spi機制,從url裏獲取編解碼方案,這裏是dubbo。取不到就是telnet //dubbo編解碼方案,實現類是DubboCountCodec String codecName = url.getParameter(Constants.CODEC_KEY, "telnet"); if (ExtensionLoader.getExtensionLoader(Codec2.class).hasExtension(codecName)) { return ExtensionLoader.getExtensionLoader(Codec2.class).getExtension(codecName); } else { return new CodecAdapter(ExtensionLoader.getExtensionLoader(Codec.class) .getExtension(codecName)); } }
這裏看下,dubbo中的全部編解碼類結構:
能夠看到,全部編解碼器實現,都擴展了Codec2接口。同時Codec2也是個spi擴展點。
接口Codec2,以下:
@SPI public interface Codec2 { /** * spi 獲取編碼器 * @param channel * @param buffer * @param message * @throws IOException */ @Adaptive({Constants.CODEC_KEY}) void encode(Channel channel, ChannelBuffer buffer, Object message) throws IOException; /*** * spi 獲取解碼器 * @param channel * @param buffer * @return * @throws IOException */ @Adaptive({Constants.CODEC_KEY}) Object decode(Channel channel, ChannelBuffer buffer) throws IOException; enum DecodeResult { NEED_MORE_INPUT, SKIP_SOME_INPUT } }
具體實現經過spi獲取,dubbo編解碼方案實例就是DubboCountCodec
那麼看下DubboCountCodec類,以及decode方法:
public final class DubboCountCodec implements Codec2 { private DubboCodec codec = new DubboCodec();//具體dubbo協議編解碼方案實現 public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException { codec.encode(channel, buffer, msg); } public Object decode(Channel channel, ChannelBuffer buffer) throws IOException { int save = buffer.readerIndex();//記錄讀取初始位置 MultiMessage result = MultiMessage.create();//解碼後對象容器。list,可放多個消息 do { Object obj = codec.decode(channel, buffer);//解碼過程在DubboCodec類中的decode方法裏 if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) {//須要接受更多信息 buffer.readerIndex(save);//恢復讀取位置 break; } else {//解碼完成,加到對象容器 result.addMessage(obj); logMessageLength(obj, buffer.readerIndex() - save);//記錄日誌,可忽略 save = buffer.readerIndex();//更新讀取位置 } } while (true); if (result.isEmpty()) { return Codec2.DecodeResult.NEED_MORE_INPUT; } if (result.size() == 1) { return result.get(0);//返回解碼後對象 } return result; } private void logMessageLength(Object result, int bytes) { if (bytes <= 0) { return; } if (result instanceof Request) { try { ((RpcInvocation) ((Request) result).getData()).setAttachment( Constants.INPUT_KEY, String.valueOf(bytes)); } catch (Throwable e) { /* ignore */ } } else if (result instanceof Response) { try { ((RpcResult) ((Response) result).getResult()).setAttachment( Constants.OUTPUT_KEY, String.valueOf(bytes)); } catch (Throwable e) { /* ignore */ } } } }
分析DubboCodec類以前,先說下dubbo協議消息格式,它包括消息頭和消息體:
前2個字節:
爲協議魔數,固定值oxdabb
第三字節:
第1比特(0/1)表示是請求消息,仍是響應消息
第2比特(0/1)表示是是否必須雙向通訊,即有請求,必有響應
第3比特(0/1)表示是是不是,心跳消息
第低5位比特,表示一個表示消息序列化的方式(1,是dubbo ,2,是hessian...)
第四字節:
只在響應消息中用到,表示響應消息的狀態,是成功,失敗等
第5-12字節:
8個字節,表示一個long型數字,是reqeustId
第13—16字節:
4個字節,表示消息體的長度(字節數)
消息體,不固定長度
是請求消息時,表示請求數據
是響應消息時,表示方法調用返回結果。
編碼和解碼主要是對消息頭的設置和解析。序列化和反序列化主要是對消息體的操做。
先看DubboCodec的關係圖:
DubboCodec類decode方法的實如今其父類ExchangeCodec中:
//先看下類中定義的常量: // header length.消息頭長度 protected static final int HEADER_LENGTH = 16; // magic header. protected static final short MAGIC = (short) 0xdabb;//1101 1010 1011 1011魔數 protected static final byte MAGIC_HIGH = Bytes.short2bytes(MAGIC)[0];//高字節 protected static final byte MAGIC_LOW = Bytes.short2bytes(MAGIC)[1];//低字節 // message flag. protected static final byte FLAG_REQUEST = (byte) 0x80;//1000,0000//表示消息類型 protected static final byte FLAG_TWOWAY = (byte) 0x40;//0100,0000//表示是否雙向通訊 protected static final byte FLAG_EVENT = (byte) 0x20;//0010,0000//表示是不是心跳事件 protected static final int SERIALIZATION_MASK = 0x1f;//0001,1111/表示是序列化實現類型 /*** * 解碼入口方法 * @param channel * @param buffer * @return * @throws IOException */ public Object decode(Channel channel, ChannelBuffer buffer) throws IOException { //取得可讀的字節數 int readable = buffer.readableBytes(); //header 最大16字節 byte[] header = new byte[Math.min(readable, HEADER_LENGTH)]; //從buffer中讀16字節到header數組中 buffer.readBytes(header); //調用自己decode方法 return decode(channel, buffer, readable, header); } /*** * 具體協議解析方法,本方法主要是讀取驗證消息頭的過程 * @param channel * @param buffer * @param readable * @param header * @return * @throws IOException */ protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException { // check magic number. if (readable > 0 && header[0] != MAGIC_HIGH || readable > 1 && header[1] != MAGIC_LOW) { //前兩字節,不是dubbo魔數 int length = header.length; //若是可讀的字節數目,大於16字節 if (header.length < readable) { //給header擴容到readable大小 header = Bytes.copyOf(header, readable); //把buffer剩下的字節讀到header中,這裏多於16字節 buffer.readBytes(header, length, readable - length); } //上層方法,第一字節已經驗證過,這個從第二字節開始驗證。 for (int i = 1; i < header.length - 1; i++) { //若是發現,後續字節有dubbo協議的開頭 if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) { //重置readerIndex 位置,到魔數開始的地方。 buffer.readerIndex(buffer.readerIndex() - header.length + i); //把魔數開始位置,之前的數據,覆蓋賦給header(下面調用super.decode解析) header = Bytes.copyOf(header, i); break; } } //上層方法是dubbo telnet 編解碼實現 return super.decode(channel, buffer, readable, header); } // check length. if (readable < HEADER_LENGTH) {//小於16字節,返回 須要更多對象 return DecodeResult.NEED_MORE_INPUT; } //get data length.讀取header[]最後四字節,構造一個int的數據, //根據dubbo協議,這個是消息體的長度 int len = Bytes.bytes2int(header, 12); //檢查數據大小 checkPayload(channel, len);//默認爲8M int tt = len + HEADER_LENGTH;//總的消息大小,消息頭加消息實體 if (readable < tt) {//若是可讀取的,不夠消息總大小,就返回 須要更多數據 return DecodeResult.NEED_MORE_INPUT; } // limit input stream.這個時候,buffer的readerIndex位置已經是,讀完header後的位置,接下來的len長度的數據,全是消息體的數據。 ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len); try { //解碼反序列化成Reqeust或者Response對象,decodeBody方法被子類DubboCodec重寫了 //這裏要看DubboCodec的decodeBody的方法 return decodeBody(channel, is, header); } finally { if (is.available() > 0) { try { if (logger.isWarnEnabled()) { logger.warn("Skip input stream " + is.available()); } StreamUtils.skipUnusedStream(is); } catch (IOException e) { logger.warn(e.getMessage(), e); } } } }
DubboCodec類的decodeBody方法:
/*** * 解碼,是從輸出流 is 取字節數據,經反序列化,構造Request 和Response對象的過程。 * @param channel * @param is * @param header * @return * @throws IOException */ protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException { //消息頭第3字節和SERIALIZATION_MASK&操做後,就能夠獲得,序列化/反序列化方案 byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK); //根據序列化方案id,或者url指定,經過spi機制去獲取序列化實現。dubbo協議默認用hession2序列化方案 //是放在消息頭flag 裏的。這裏proto 值是2 //獲取具體用序列化/反序列化實現 Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto); // get request id.header字節從第5到12 8個字節,是請求id long id = Bytes.bytes2long(header, 4); if ((flag & FLAG_REQUEST) == 0) {//根據flag&FLAG_REQUEST後,判斷須要解碼的消息類型 // decode response. Response res = new Response(id); if ((flag & FLAG_EVENT) != 0) { res.setEvent(Response.HEARTBEAT_EVENT); } // get status. 獲取響應 狀態 成功,失敗等 byte status = header[3]; res.setStatus(status); if (status == Response.OK) {//返回結果狀態 ok成功 try { Object data; if (res.isHeartbeat()) { data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is)); } else if (res.isEvent()) {//事件消息,反序列化 data = decodeEventData(channel, deserialize(s, channel.getUrl(), is)); } else {//業務調用結果消息 解碼構造 DecodeableRpcResult 對象的過程 DecodeableRpcResult result; if (channel.getUrl().getParameter( Constants.DECODE_IN_IO_THREAD_KEY, Constants.DEFAULT_DECODE_IN_IO_THREAD)) {//是否在io 線程內解碼 result = new DecodeableRpcResult(channel, res, is, (Invocation) getRequestData(id), proto); //!!!Response消息反序列化 就是把調用結果返回值 從is裏反序列化出來,放在 DecodeableRpcResult類的result 字段的過程。 result.decode(); } else { //不在io線程解碼,要先經過readMessageData方法把調用結果數組取出後, //放在UnsafeByteArrayInputStream對象,存在DecodeableRpcResult對象裏,後續經過上層方法解碼。 result = new DecodeableRpcResult(channel, res, new UnsafeByteArrayInputStream(readMessageData(is)), (Invocation) getRequestData(id), proto); } data = result; } //同時把DecodeableRpcResult對象放入Response result字段。 res.setResult(data); } catch (Throwable t) { if (log.isWarnEnabled()) { log.warn("Decode response failed: " + t.getMessage(), t); } //異常處理,設置status和異常信息 res.setStatus(Response.CLIENT_ERROR); res.setErrorMessage(StringUtils.toString(t)); } } else { //異常處理,設置異常信息 res.setErrorMessage(deserialize(s, channel.getUrl(), is).readUTF()); } return res; } else {//反解碼Requset 消息類型 // decode request. Request req = new Request(id); req.setVersion("2.0.0"); req.setTwoWay((flag & FLAG_TWOWAY) != 0); if ((flag & FLAG_EVENT) != 0) { req.setEvent(Request.HEARTBEAT_EVENT); } try { Object data; if (req.isHeartbeat()) { data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is)); } else if (req.isEvent()) { data = decodeEventData(channel, deserialize(s, channel.getUrl(), is)); } else {//業務調用請求消息 解碼構造 DecodeableRpcInvocation 對象的過程 DecodeableRpcInvocation inv; if (channel.getUrl().getParameter( Constants.DECODE_IN_IO_THREAD_KEY, Constants.DEFAULT_DECODE_IN_IO_THREAD)) {//是否 io 線程內解碼 inv = new DecodeableRpcInvocation(channel, req, is, proto); //!!!Requset 類型反序列化方法 inv.decode(); } else { inv = new DecodeableRpcInvocation(channel, req, new UnsafeByteArrayInputStream(readMessageData(is)), proto); } data = inv; } //同時把DecodeableRpcInvocation對象放入Request data字段。 req.setData(data); } catch (Throwable t) { if (log.isWarnEnabled()) { log.warn("Decode request failed: " + t.getMessage(), t); } // bad request 異常請求對象設置 req.setBroken(true); req.setData(t); } return req; } } /*** * 獲取反序列化方案 * @param serialization * @param url * @param is * @return * @throws IOException */ private ObjectInput deserialize(Serialization serialization, URL url, InputStream is) throws IOException { return serialization.deserialize(url, is); } /*** * 讀取is 裏的可用數據 * @param is * @return * @throws IOException */ private byte[] readMessageData(InputStream is) throws IOException { if (is.available() > 0) { byte[] result = new byte[is.available()]; is.read(result); return result; } return new byte[]{}; }
RPC調用請求:DecodeableRpcInvocation 類反序列化方法:
public void decode() throws Exception { if (!hasDecoded && channel != null && inputStream != null) { try {//具體在decode重載方法裏 decode(channel, inputStream); } catch (Throwable e) {//異常請求設置 if (log.isWarnEnabled()) { log.warn("Decode rpc invocation failed: " + e.getMessage(), e); } request.setBroken(true); request.setData(e); } finally { hasDecoded = true; } } } /** * 反序列化,解碼 經過反序列化還原 * RpcInvocation 類的 * private String methodName; private Class<?>[] parameterTypes; private Object[] arguments; private Map<String, String> attachments; 是個屬性值,就像在客戶端請求時設置的同樣。 * @param channel channel. * @param input input stream. * @return * @throws IOException */ public Object decode(Channel channel, InputStream input) throws IOException { //獲取反序列化方案 ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType) .deserialize(channel.getUrl(), input); //反序列化出,dubbo版本,路徑,服務版本信息,設置到attachments裏,這讀取順序和序列化時的寫入順序也一致 //固然,序列化方案也一致。這裏默認都是hissen2 //調用ObjectInput的readUTF()反序列化方法,依次獲取調用信息 setAttachment(Constants.DUBBO_VERSION_KEY, in.readUTF()); setAttachment(Constants.PATH_KEY, in.readUTF()); setAttachment(Constants.VERSION_KEY, in.readUTF()); //讀取方法名 setMethodName(in.readUTF()); //反序列化,方法請求參數類型, try { Object[] args; Class<?>[] pts; String desc = in.readUTF(); if (desc.length() == 0) { pts = DubboCodec.EMPTY_CLASS_ARRAY; args = DubboCodec.EMPTY_OBJECT_ARRAY; } else { pts = ReflectUtils.desc2classArray(desc); args = new Object[pts.length]; for (int i = 0; i < args.length; i++) { try { //更具類型讀取請求參數值 //調用ObjectInput的readObject()反序列化方法,反序列化出參數值 args[i] = in.readObject(pts[i]); } catch (Exception e) { if (log.isWarnEnabled()) { log.warn("Decode argument failed: " + e.getMessage(), e); } } } } //設置保存請求參數類型 setParameterTypes(pts); //反序列化,attachment map //調用ObjectInput的readObject()反序列化方法,反序列化出attachemnet值 Map<String, String> map = (Map<String, String>) in.readObject(Map.class); if (map != null && map.size() > 0) { Map<String, String> attachment = getAttachments(); if (attachment == null) { attachment = new HashMap<String, String>(); } attachment.putAll(map); setAttachments(attachment); } //decode argument ,may be callback 回調參數設置,這個再說。 for (int i = 0; i < args.length; i++) { args[i] = decodeInvocationArgument(channel, this, pts, i, args[i]); } //保存請求參數值 setArguments(args); } catch (ClassNotFoundException e) { throw new IOException(StringUtils.toString("Read invocation data failed.", e)); } return this; }
RPC調用結果:DecodeableRpcResult 反序列化 方法decode()
public void decode() throws Exception { if (!hasDecoded && channel != null && inputStream != null) { try {//具體實如今decode(channel, inputStream)重載方法裏 decode(channel, inputStream); } catch (Throwable e) {//設置異常返回response if (log.isWarnEnabled()) { log.warn("Decode rpc result failed: " + e.getMessage(), e); } response.setStatus(Response.CLIENT_ERROR); response.setErrorMessage(StringUtils.toString(e)); } finally { hasDecoded = true; } } } /*** * 反序列化,解碼過程,讀取input的調用結果字節數據,經反序列化成方法返回類型對象 * 併發放回結果設置到RpcResult的result字段裏 * 以及異常返回字段的設置。 * @param channel channel. * @param input input stream. * @return * @throws IOException */ public Object decode(Channel channel, InputStream input) throws IOException { //獲取反序列化方案 ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType) .deserialize(channel.getUrl(), input); //反序列化出 結果標識 調用ObjectInput的readByte方法,反序列化出一個byte值 byte flag = in.readByte();// switch (flag) { case DubboCodec.RESPONSE_NULL_VALUE://null 值 break; case DubboCodec.RESPONSE_VALUE: //非空值 try { //根據invocation獲取調用方法的放回類型 Type[] returnType = RpcUtils.getReturnTypes(invocation); //根據返回類型,反序列出結果並這是到RpcResult 的result字段裏。 //void 類型 結果值 是null ;int 等基本類型,自動裝箱 Integer; //具體調用ObjectInput的readObject重載的兩個方法,反序列出結果對象 setValue(returnType == null || returnType.length == 0 ? in.readObject() : (returnType.length == 1 ? in.readObject((Class<?>) returnType[0]) : in.readObject((Class<?>) returnType[0], returnType[1]))); } catch (ClassNotFoundException e) { throw new IOException(StringUtils.toString("Read response data failed.", e)); } break; case DubboCodec.RESPONSE_WITH_EXCEPTION://異常信息反序列化,設置到exception字段 try { //具體調用ObjectInput的readObject重載的兩個方法,反序列出異常對象 Object obj = in.readObject(); if (obj instanceof Throwable == false) throw new IOException("Response data error, expect Throwable, but get " + obj); setException((Throwable) obj); } catch (ClassNotFoundException e) { throw new IOException(StringUtils.toString("Read response data failed.", e)); } break; default: throw new IOException("Unknown result flag, expect '0' '1' '2', get " + flag); } return this; }
下一篇再說說反序列化。