關於Dubbo的總體設計能夠查看官方文檔,下圖能夠清晰的表達Dubbo的總體設計:java
圖中左邊淡藍背景的爲服務消費方使用的接口,右邊淡綠色背景的爲服務提供方使用的接口,位於中軸線上的爲雙方都用到的接口;
圖中從下至上分爲十層,各層均爲單向依賴,右邊的黑色箭頭表明層之間的依賴關係;
圖中綠色小塊的爲擴展接口,藍色小塊爲實現類,圖中只顯示用於關聯各層的實現類;
圖中藍色虛線爲初始化過程,即啓動時組裝鏈,紅色實線爲方法調用過程,即運行時調時鏈,紫色三角箭頭爲繼承,能夠把子類看做父類的同一個節點,線上的文字爲調用的方法;git
config 配置層:對外配置接口,以 ServiceConfig, ReferenceConfig 爲中心,能夠直接初始化配置類,也能夠經過 spring 解析配置生成配置類;
proxy 服務代理層:服務接口透明代理,生成服務的客戶端 Stub 和服務器端 Skeleton, 以 ServiceProxy 爲中心,擴展接口爲 ProxyFactory;
registry 註冊中心層:封裝服務地址的註冊與發現,以服務 URL 爲中心,擴展接口爲 RegistryFactory, Registry, RegistryService;
cluster 路由層:封裝多個提供者的路由及負載均衡,並橋接註冊中心,以 Invoker 爲中心,擴展接口爲 Cluster, Directory, Router, LoadBalance;
monitor 監控層:RPC 調用次數和調用時間監控,以 Statistics 爲中心,擴展接口爲 MonitorFactory, Monitor, MonitorService;
protocol 遠程調用層:封裝 RPC 調用,以 Invocation, Result 爲中心,擴展接口爲 Protocol, Invoker, Exporter;
exchange 信息交換層:封裝請求響應模式,同步轉異步,以 Request, Response 爲中心,擴展接口爲 Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer;
transport 網絡傳輸層:抽象 mina 和 netty 爲統一接口,以 Message 爲中心,擴展接口爲 Channel, Transporter, Client, Server, Codec;
serialize 數據序列化層:可複用的一些工具,擴展接口爲 Serialization, ObjectInput, ObjectOutput, ThreadPool;github
本文將從最底層的serialize層開始來對dubbo進行源碼分析;spring
dubbo的底層通信使用的是第三方框架,包括:netty,netty4,mina和grizzly;默認使用的是netty,分別提供了server端(服務提供方)和client端(服務消費方);下面已使用的netty爲例來看那一下NettyServer的部分代碼:json
protected void doOpen() throws Throwable { NettyHelper.setNettyLoggerFactory(); ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true)); ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true)); ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS)); bootstrap = new ServerBootstrap(channelFactory); final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); channels = nettyHandler.getChannels(); // https://issues.jboss.org/browse/NETTY-365 // https://issues.jboss.org/browse/NETTY-379 // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true)); bootstrap.setOption("child.tcpNoDelay", true); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); ChannelPipeline pipeline = Channels.pipeline(); /*int idleTimeout = getIdleTimeout(); if (idleTimeout > 10000) { pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0)); }*/ pipeline.addLast("decoder", adapter.getDecoder()); pipeline.addLast("encoder", adapter.getEncoder()); pipeline.addLast("handler", nettyHandler); return pipeline; } }); // bind channel = bootstrap.bind(getBindAddress()); }
在啓動服務提供方時就會調用此doOpen方法,用來啓動服務端口,供消費方鏈接;以上代碼就是常規的啓動nettyServer端代碼,由於本文重點介紹dubbo的序列化,因此這裏主要看decoder和encoder,這兩個類分別定義在NettyCodecAdapter中:bootstrap
private final ChannelHandler encoder = new InternalEncoder(); private final ChannelHandler decoder = new InternalDecoder();
在NettyCodecAdapter定義了內部類InternalEncoder:緩存
private class InternalEncoder extends OneToOneEncoder { @Override protected Object encode(ChannelHandlerContext ctx, Channel ch, Object msg) throws Exception { com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.dynamicBuffer(1024); NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler); try { codec.encode(channel, buffer, msg); } finally { NettyChannel.removeChannelIfDisconnected(ch); } return ChannelBuffers.wrappedBuffer(buffer.toByteBuffer()); } }
此類實際上是對codec的包裝,自己並無作編碼處理,下面重點看一下codec類,此類是一個接口類,有多種實現類,Codec2源碼以下:服務器
@SPI public interface Codec2 { @Adaptive({Constants.CODEC_KEY}) void encode(Channel channel, ChannelBuffer buffer, Object message) throws IOException; @Adaptive({Constants.CODEC_KEY}) Object decode(Channel channel, ChannelBuffer buffer) throws IOException; enum DecodeResult { NEED_MORE_INPUT, SKIP_SOME_INPUT } }
實現包括:TransportCodec,TelnetCodec,ExchangeCodec,DubboCountCodec以及ThriftCodec,固然也能夠自行擴展;不可能啓動時把每種類型都加載,dubbo是經過在配置文件中配置好全部的類型,而後在運行中須要什麼類加載什麼類,
配置文件的具體路徑:META-INF/dubbo/internal/com.alibaba.dubbo.remoting.Codec2,內容以下:網絡
transport=com.alibaba.dubbo.remoting.transport.codec.TransportCodec telnet=com.alibaba.dubbo.remoting.telnet.codec.TelnetCodec exchange=com.alibaba.dubbo.remoting.exchange.codec.ExchangeCodec dubbo=com.alibaba.dubbo.rpc.protocol.dubbo.DubboCountCodec thrift=com.alibaba.dubbo.rpc.protocol.thrift.ThriftCodec
獲取具體Codec2的代碼以下:app
protected static Codec2 getChannelCodec(URL url) { String codecName = url.getParameter(Constants.CODEC_KEY, "telnet"); if (ExtensionLoader.getExtensionLoader(Codec2.class).hasExtension(codecName)) { return ExtensionLoader.getExtensionLoader(Codec2.class).getExtension(codecName); } else { return new CodecAdapter(ExtensionLoader.getExtensionLoader(Codec.class) .getExtension(codecName)); } }
經過在url中獲取是否有關鍵字codec,若是有的話就獲取當前的值,dubbo默認的codec爲dubbo;若是沒有值默認爲telnet;這裏有默認值爲dubbo,因此實現類DubboCountCodec會被ExtensionLoader進行加載並進行緩存,下面具體看一下DubboCountCodec的編解碼;
private DubboCodec codec = new DubboCodec(); @Override public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException { codec.encode(channel, buffer, msg); }
DubboCountCodec內部調用的是DubboCodec的encode方法,看一下如何對Request對象進行編碼的,具體代碼塊以下:
protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException { Serialization serialization = getSerialization(channel); // header. byte[] header = new byte[HEADER_LENGTH]; // set magic number. Bytes.short2bytes(MAGIC, header); // set request and serialization flag. header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId()); if (req.isTwoWay()) header[2] |= FLAG_TWOWAY; if (req.isEvent()) header[2] |= FLAG_EVENT; // set request id. Bytes.long2bytes(req.getId(), header, 4); // encode request data. int savedWriteIndex = buffer.writerIndex(); buffer.writerIndex(savedWriteIndex + HEADER_LENGTH); ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer); ObjectOutput out = serialization.serialize(channel.getUrl(), bos); if (req.isEvent()) { encodeEventData(channel, out, req.getData()); } else { encodeRequestData(channel, out, req.getData(), req.getVersion()); } out.flushBuffer(); if (out instanceof Cleanable) { ((Cleanable) out).cleanup(); } bos.flush(); bos.close(); int len = bos.writtenBytes(); checkPayload(channel, len); Bytes.int2bytes(len, header, 12); // write buffer.writerIndex(savedWriteIndex); buffer.writeBytes(header); // write header. buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len); }
前兩個字節存放了魔數:0xdabb;第三個字節包含了四個信息分別是:是不是請求消息(仍是響應消息),序列化類型,是否雙向通訊,是不是心跳消息;
在請求消息中直接跳過了第四個字節,直接在5-12位置存放了requestId,是一個long類型,第四個字節在若是是編碼響應消息中會存放響應的狀態;
代碼往下看,buffer跳過了HEADER_LENGTH長度的字節,這裏表示的是header部分的長度爲16個字節,而後經過指定的序列化方式把data對象序列化到buffer中,序列化以後能夠獲取到data對象總共的字節數,用一個int類型來保存字節數,此int類型存放在header的最後四個字節中;
最後把buffer的writerIndex設置到寫完header和data的地方,防止數據被覆蓋;
在NettyCodecAdapter定義了內部類InternalEncoder,一樣是調用DubboCodec的decode方法,部分代碼以下:
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException { int readable = buffer.readableBytes(); byte[] header = new byte[Math.min(readable, HEADER_LENGTH)]; buffer.readBytes(header); return decode(channel, buffer, readable, header); } @Override protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException { // check magic number. if (readable > 0 && header[0] != MAGIC_HIGH || readable > 1 && header[1] != MAGIC_LOW) { int length = header.length; if (header.length < readable) { header = Bytes.copyOf(header, readable); buffer.readBytes(header, length, readable - length); } for (int i = 1; i < header.length - 1; i++) { if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) { buffer.readerIndex(buffer.readerIndex() - header.length + i); header = Bytes.copyOf(header, i); break; } } return super.decode(channel, buffer, readable, header); } // check length. if (readable < HEADER_LENGTH) { return DecodeResult.NEED_MORE_INPUT; } // get data length. int len = Bytes.bytes2int(header, 12); checkPayload(channel, len); int tt = len + HEADER_LENGTH; if (readable < tt) { return DecodeResult.NEED_MORE_INPUT; } // limit input stream. ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len); try { return decodeBody(channel, is, header); } finally { if (is.available() > 0) { try { if (logger.isWarnEnabled()) { logger.warn("Skip input stream " + is.available()); } StreamUtils.skipUnusedStream(is); } catch (IOException e) { logger.warn(e.getMessage(), e); } } } }
首先讀取Math.min(readable, HEADER_LENGTH),若是readable小於HEADER_LENGTH,表示接收方連頭部的16個字節還沒接受完,須要等待接收;正常header接收完以後須要進行檢查,主要包括:魔數的檢查,header消息長度檢查,消息體長度檢查(檢查消息體是否已經接收完成);檢查完以後須要對消息體進行反序列化,具體在decodeBody方法中:
@Override protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException { byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK); Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto); // get request id. long id = Bytes.bytes2long(header, 4); if ((flag & FLAG_REQUEST) == 0) { // decode response. Response res = new Response(id); if ((flag & FLAG_EVENT) != 0) { res.setEvent(Response.HEARTBEAT_EVENT); } // get status. byte status = header[3]; res.setStatus(status); if (status == Response.OK) { try { Object data; if (res.isHeartbeat()) { data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is)); } else if (res.isEvent()) { data = decodeEventData(channel, deserialize(s, channel.getUrl(), is)); } else { DecodeableRpcResult result; if (channel.getUrl().getParameter( Constants.DECODE_IN_IO_THREAD_KEY, Constants.DEFAULT_DECODE_IN_IO_THREAD)) { result = new DecodeableRpcResult(channel, res, is, (Invocation) getRequestData(id), proto); result.decode(); } else { result = new DecodeableRpcResult(channel, res, new UnsafeByteArrayInputStream(readMessageData(is)), (Invocation) getRequestData(id), proto); } data = result; } res.setResult(data); } catch (Throwable t) { if (log.isWarnEnabled()) { log.warn("Decode response failed: " + t.getMessage(), t); } res.setStatus(Response.CLIENT_ERROR); res.setErrorMessage(StringUtils.toString(t)); } } else { res.setErrorMessage(deserialize(s, channel.getUrl(), is).readUTF()); } return res; } else { // decode request. Request req = new Request(id); req.setVersion(Version.getProtocolVersion()); req.setTwoWay((flag & FLAG_TWOWAY) != 0); if ((flag & FLAG_EVENT) != 0) { req.setEvent(Request.HEARTBEAT_EVENT); } try { Object data; if (req.isHeartbeat()) { data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is)); } else if (req.isEvent()) { data = decodeEventData(channel, deserialize(s, channel.getUrl(), is)); } else { DecodeableRpcInvocation inv; if (channel.getUrl().getParameter( Constants.DECODE_IN_IO_THREAD_KEY, Constants.DEFAULT_DECODE_IN_IO_THREAD)) { inv = new DecodeableRpcInvocation(channel, req, is, proto); inv.decode(); } else { inv = new DecodeableRpcInvocation(channel, req, new UnsafeByteArrayInputStream(readMessageData(is)), proto); } data = inv; } req.setData(data); } catch (Throwable t) { if (log.isWarnEnabled()) { log.warn("Decode request failed: " + t.getMessage(), t); } // bad request req.setBroken(true); req.setData(t); } return req; } }
首先經過解析header部分的第三個字節,識別出是請求消息仍是響應消息,還有使用哪一種類型的序列化方式,而後分別進行序列化;
經過以上對編碼器解碼器的瞭解,在編碼器中須要序列化Request/Response,在解碼器中須要序列化Request/Response,下面具體看看序列化和反序列化;
在編碼器中須要獲取具體的Serialization,具體代碼以下:
public static Serialization getSerialization(URL url) { return ExtensionLoader.getExtensionLoader(Serialization.class).getExtension( url.getParameter(Constants.SERIALIZATION_KEY, Constants.DEFAULT_REMOTING_SERIALIZATION)); }
同獲取codec的方式,dubbo也提供了多種序列化方式,同時能夠自定義擴展;經過在url中獲取serialization關鍵字,若是獲取不到默認爲hession2;一樣多種序列化類也配置在一個文件中,
路徑:META-INF/dubbo/internal/com.alibaba.dubbo.common.serialize.Serialization,具體內容以下:
fastjson=com.alibaba.dubbo.common.serialize.fastjson.FastJsonSerialization fst=com.alibaba.dubbo.common.serialize.fst.FstSerialization hessian2=com.alibaba.dubbo.common.serialize.hessian2.Hessian2Serialization java=com.alibaba.dubbo.common.serialize.java.JavaSerialization compactedjava=com.alibaba.dubbo.common.serialize.java.CompactedJavaSerialization nativejava=com.alibaba.dubbo.common.serialize.nativejava.NativeJavaSerialization kryo=com.alibaba.dubbo.common.serialize.kryo.KryoSerialization
dubbo默認提供了fastjson,fst,hessian2,java,compactedjava,nativejava和kryo多種序列化方式;
每種序列化方式都須要實現以下三個接口類:Serialization,ObjectInput以及ObjectOutput;
Serialization接口類:
public interface Serialization { byte getContentTypeId(); String getContentType(); @Adaptive ObjectOutput serialize(URL url, OutputStream output) throws IOException; @Adaptive ObjectInput deserialize(URL url, InputStream input) throws IOException; }
其中的ContentTypeId就是在header中存放的序列化類型,反序列化的時候須要經過此id獲取具體的Serialization,因此此ContentTypeId不能出現重複的,不然會被覆蓋;
ObjectInput接口類:
public interface ObjectOutput extends DataOutput { void writeObject(Object obj) throws IOException; }
ObjectOutput接口類:
public interface ObjectInput extends DataInput { Object readObject() throws IOException, ClassNotFoundException; <T> T readObject(Class<T> cls) throws IOException, ClassNotFoundException; <T> T readObject(Class<T> cls, Type type) throws IOException, ClassNotFoundException; }
分別提供了讀取對象和寫對象的接口方法,DataOutput和DataInput分別提供了對基本數據類型的讀和寫;序列化只須要調用writeObject方法將Data寫入數據流便可;具體能夠看一下編碼器中調用的encodeRequestData方法:
@Override protected void encodeRequestData(Channel channel, ObjectOutput out, Object data, String version) throws IOException { RpcInvocation inv = (RpcInvocation) data; out.writeUTF(version); out.writeUTF(inv.getAttachment(Constants.PATH_KEY)); out.writeUTF(inv.getAttachment(Constants.VERSION_KEY)); out.writeUTF(inv.getMethodName()); out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes())); Object[] args = inv.getArguments(); if (args != null) for (int i = 0; i < args.length; i++) { out.writeObject(encodeInvocationArgument(channel, inv, i)); } out.writeObject(inv.getAttachments()); }
默認使用的DubboCountCodec方式並無直接將data寫入流中,而是將RpcInvocation中的數據取出分別寫入流;
反序列化經過讀取header中的序列化類型,而後經過以下方法獲取具體的Serialization,具體在類CodecSupport中:
public static Serialization getSerialization(URL url, Byte id) throws IOException { Serialization serialization = getSerializationById(id); String serializationName = url.getParameter(Constants.SERIALIZATION_KEY, Constants.DEFAULT_REMOTING_SERIALIZATION); // Check if "serialization id" passed from network matches the id on this side(only take effect for JDK serialization), for security purpose. if (serialization == null || ((id == 3 || id == 7 || id == 4) && !(serializationName.equals(ID_SERIALIZATIONNAME_MAP.get(id))))) { throw new IOException("Unexpected serialization id:" + id + " received from network, please check if the peer send the right id."); } return serialization; } private static Map<Byte, Serialization> ID_SERIALIZATION_MAP = new HashMap<Byte, Serialization>(); public static Serialization getSerializationById(Byte id) { return ID_SERIALIZATION_MAP.get(id); }
ID_SERIALIZATION_MAP存放着ContentTypeId和具體Serialization的對應關係,而後經過id獲取具體的Serialization,而後根據寫入的順序讀取數據;
dubbo自己對不少模塊提供了很好的擴展功能,包括序列化功能,如下來分析一下如何使用protobuf來實現序列化方式;
首先看一下總體的代碼結構,以下圖所示:
分別實現三個接口類:Serialization,ObjectInput以及ObjectOutput;而後在指定目錄下提供一個文本文件;
<dependency> <groupId>com.dyuproject.protostuff</groupId> <artifactId>protostuff-core</artifactId> <version>1.1.3</version> </dependency> <dependency> <groupId>com.dyuproject.protostuff</groupId> <artifactId>protostuff-runtime</artifactId> <version>1.1.3</version> </dependency>
public class ProtobufObjectInput implements ObjectInput { private ObjectInputStream input; public ProtobufObjectInput(InputStream inputStream) throws IOException { this.input = new ObjectInputStream(inputStream); } ....省略基礎類型... @Override public Object readObject() throws IOException, ClassNotFoundException { return input.readObject(); } @Override public <T> T readObject(Class<T> clazz) throws IOException { try { byte[] buffer = (byte[]) input.readObject(); input.read(buffer); return SerializationUtil.deserialize(buffer, clazz); } catch (Exception e) { throw new IOException(e); } } @Override public <T> T readObject(Class<T> clazz, Type type) throws IOException { try { byte[] buffer = (byte[]) input.readObject(); input.read(buffer); return SerializationUtil.deserialize(buffer, clazz); } catch (Exception e) { throw new IOException(e); } } } public class ProtobufObjectOutput implements ObjectOutput { private ObjectOutputStream outputStream; public ProtobufObjectOutput(OutputStream outputStream) throws IOException { this.outputStream = new ObjectOutputStream(outputStream); } ....省略基礎類型... @Override public void writeObject(Object v) throws IOException { byte[] bytes = SerializationUtil.serialize(v); outputStream.writeObject(bytes); outputStream.flush(); } @Override public void flushBuffer() throws IOException { outputStream.flush(); } }
4.實現Serialization接口
public class ProtobufSerialization implements Serialization { @Override public byte getContentTypeId() { return 10; } @Override public String getContentType() { return "x-application/protobuf"; } @Override public ObjectOutput serialize(URL url, OutputStream out) throws IOException { return new ProtobufObjectOutput(out); } @Override public ObjectInput deserialize(URL url, InputStream is) throws IOException { return new ProtobufObjectInput(is); } }
這裏引入了一個新的ContentTypeId,須要保證和dubbo裏面已存在的不要衝突
在META-INF/dubbo/internal/目錄下提供文件com.alibaba.dubbo.common.serialize.Serialization,內容以下:
protobuf=com.dubboCommon.ProtobufSerialization
<dubbo:protocol name="dubbo" port="20880" serialization="protobuf"/>
這樣就會使用新擴展的protobuf序列化方式來序列化對象;
本文從dubbo總體設計的最底層serialization層來分析和了解dubbo,後面會逐層進行分析,對dubbo有一個更加透徹的瞭解;