對於遠程通訊,每每都會涉及到數據持久化傳輸問題。往大了說,就是,從A發出的信息,怎樣能被B接收到相同信息內容!小點說就是,編碼與解碼問題!java
而在dubbo或者說是java的遠程通訊中,編解碼則每每伴隨着序列化與反序列化!bootstrap
普通java對象要想實現序列化,通常有幾個步驟:數組
1. 實現 Serializable 接口;緩存
2. 生成一個序列號: serialVersionUID, (非必須,但建議);網絡
3. 重寫 writeObject()/readObject() 自定義序列化,若有必要的話;app
4. 調用 java.io.ObjectOutputStream 的 writeObject()/readObject() 進行序列化與反序列化;框架
簡單吧,可是你們知道,市面上有許許多多的序列化框架!爲啥呢?由於它們須要速度更快,體積更小!異步
今天咱們就來細看下dubbo的默認序列化器 Hession2 是怎麼作的吧!socket
從Server初始化處開始, 能夠看到, 咱們使用 默認的 dubbo 是基於 netty 來建立 server的.tcp
// com.alibaba.dubbo.remoting.transport.netty.NettyServer @Override protected void doOpen() throws Throwable { NettyHelper.setNettyLoggerFactory(); ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true)); ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true)); ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS)); bootstrap = new ServerBootstrap(channelFactory); final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); channels = nettyHandler.getChannels(); // https://issues.jboss.org.browse.NETTY-365 // https://issues.jboss.org.browse.NETTY-379 // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true)); bootstrap.setOption("child.tcpNoDelay", true); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); ChannelPipeline pipeline = Channels.pipeline(); /*int idleTimeout = getIdleTimeout(); if (idleTimeout > 10000) { pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0)); }*/ // encoder 是一個基於netty解碼器的子類: NettyCodecAdapter.InternalDecoder extends SimpleChannelUpstreamHandler pipeline.addLast("decoder", adapter.getDecoder()); // decoder 是一個基於netty編碼器的子類: NettyCodecAdapter.InternalEncoder extends OneToOneEncoder pipeline.addLast("encoder", adapter.getEncoder()); // handler 則處理全部的業務邏輯處理 pipeline.addLast("handler", nettyHandler); return pipeline; } }); // bind channel = bootstrap.bind(getBindAddress()); }
server 中使用了管道來進行通訊,主要有三個 ChannelHandler:
1. decoder, 負責消息解碼, 依賴於netty基礎設施;
2. encoder, 負責消息的編碼工做, 依賴於netty基礎設施;(本文的主要目標)
3. 業務處理的 handler, NettyHandler;
這幾個管道的流向如netty中闡述的同樣,會隨出站和入站的步驟進行流動; 本文講解出站請求,因此步驟會是 handler -> encoder -> decoder
// com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeChannel 此處開始發送請求數據 @Override public ResponseFuture request(Object request, int timeout) throws RemotingException { if (closed) { throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!"); } // create request. // Request 中有個有意思的變量: private static final AtomicLong INVOKE_ID = new AtomicLong(0); 負責維護本地的請求序號 Request req = new Request(); req.setVersion(Version.getProtocolVersion()); req.setTwoWay(true); req.setData(request); DefaultFuture future = new DefaultFuture(channel, req, timeout); try { // 首先會調用 com.alibaba.dubbo.remoting.transport.netty.NettyClient 的 send() 方法 channel.send(req); } catch (RemotingException e) { future.cancel(); throw e; } return future; }
通過 HeaderExchangeChannel 的封裝後,就有了 Request 請求, 接下來就是發送往遠程的過程了!幾個要點:
1. 每一個請求都會有序列號,依次遞增;
2. 設置爲雙向通訊,即 twoWay=true, 既發送也接收請求;
3. 使用 DefaultFuture 封裝返回值,接收異步結果;
// NettyClient, com.alibaba.dubbo.remoting.transport.AbstractPeer @Override public void send(Object message) throws RemotingException { send(message, url.getParameter(Constants.SENT_KEY, false)); } // NettyClient, com.alibaba.dubbo.remoting.transport.AbstractClient @Override public void send(Object message, boolean sent) throws RemotingException { if (send_reconnect && !isConnected()) { connect(); } Channel channel = getChannel(); //TODO Can the value returned by getChannel() be null? need improvement. if (channel == null || !channel.isConnected()) { throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl()); } // 調用另外一個 channel 寫入 channel.send(message, sent); } // com.alibaba.dubbo.remoting.transport.netty.NettyChannel @Override public void send(Object message, boolean sent) throws RemotingException { super.send(message, sent); boolean success = true; int timeout = 0; try { // 寫業務由此觸發,返回一個 異步 Future ChannelFuture future = channel.write(message); if (sent) { // 若是是發送請求, 則而後再阻塞等待結果 // 還有另外一個阻塞等待結果的地方,是在 DubboInvoker 中 timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); success = future.await(timeout); } // 若是發現異常,則將異步異常拋出 Throwable cause = future.getCause(); if (cause != null) { throw cause; } } catch (Throwable e) { throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e); } if (!success) { throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + "in timeout(" + timeout + "ms) limit"); } }
談到對future結果的處理,咱們仍是倒回到 DubboInvoker 中,進行看下是怎樣處理的!
// com.alibaba.dubbo.rpc.protocol.dubbo.DubboInvoker @Override protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); inv.setAttachment(Constants.VERSION_KEY, version); ExchangeClient currentClient; if (clients.length == 1) { currentClient = clients[0]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); if (isOneway) { // 若是單向發送的包,則直接忽略結果便可 boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); RpcContext.getContext().setFuture(null); return new RpcResult(); } else if (isAsync) { // 針對設置爲異步的請求,直接將future設置到上下文後,返回空結果便可 ResponseFuture future = currentClient.request(inv, timeout); RpcContext.getContext().setFuture(new FutureAdapter<Object>(future)); return new RpcResult(); } else { // 針對同步請求 // 發起遠程請求, 獲取到 future 異步結果, 調用 future.get() 同步阻塞,等待結果後返回 RpcContext.getContext().setFuture(null); return (Result) currentClient.request(inv, timeout).get(); } } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (RemotingException e) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } }
接下就正式進入到 Socket 的髮網絡發送流程中了,咱們來看下都是怎麼作的!(注意: 如今的數據仍是原始數據,並無序列化)
// org.jboss.netty.channel.socket.nio.NioClientSocketChannel, org.jboss.netty.channel.AbstractChannel public ChannelFuture write(Object message) { return Channels.write(this, message); } // org.jboss.netty.channel.Channels /** * Sends a {@code "write"} request to the last * {@link ChannelDownstreamHandler} in the {@link ChannelPipeline} of * the specified {@link Channel}. * * @param channel the channel to write a message * @param message the message to write to the channel * * @return the {@link ChannelFuture} which will be notified when the * write operation is done */ public static ChannelFuture write(Channel channel, Object message) { return write(channel, message, null); } // 寫入數據到管道尾部, 一切看起來都很美好,返回 future 了事! 進入 pipeline 以後,就會調用一系列的 鏈處理,如加解碼 /** * Sends a {@code "write"} request to the last * {@link ChannelDownstreamHandler} in the {@link ChannelPipeline} of * the specified {@link Channel}. * * @param channel the channel to write a message * @param message the message to write to the channel * @param remoteAddress the destination of the message. * {@code null} to use the default remote address * * @return the {@link ChannelFuture} which will be notified when the * write operation is done */ public static ChannelFuture write(Channel channel, Object message, SocketAddress remoteAddress) { ChannelFuture future = future(channel); channel.getPipeline().sendDownstream( new DownstreamMessageEvent(channel, future, message, remoteAddress)); return future; } // org.jboss.netty.channel.DefaultChannelPipeline public void sendDownstream(ChannelEvent e) { DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail); if (tail == null) { try { getSink().eventSunk(this, e); return; } catch (Throwable t) { notifyHandlerException(e, t); return; } } // 添加到 tail 中 sendDownstream(tail, e); } void sendDownstream(DefaultChannelHandlerContext ctx, ChannelEvent e) { if (e instanceof UpstreamMessageEvent) { throw new IllegalArgumentException("cannot send an upstream event to downstream"); } try { // 調用下一個 pipeline 的處理方法 handler 的處理 handleDownstream(), 即調用 NettyHandler 了 ((ChannelDownstreamHandler) ctx.getHandler()).handleDownstream(ctx, e); } catch (Throwable t) { // Unlike an upstream event, a downstream event usually has an // incomplete future which is supposed to be updated by ChannelSink. // However, if an exception is raised before the event reaches at // ChannelSink, the future is not going to be updated, so we update // here. e.getFuture().setFailure(t); notifyHandlerException(e, t); } } // NettyHandler, org.jboss.netty.channel.SimpleChannelHandler /** * {@inheritDoc} Down-casts the received downstream event into more * meaningful sub-type event and calls an appropriate handler method with * the down-casted event. */ public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception { if (e instanceof MessageEvent) { // 消息發送走數據寫入邏輯, ctx 便是 上下文的末端 tail writeRequested(ctx, (MessageEvent) e); } else if (e instanceof ChannelStateEvent) { // 事件類處理邏輯 ChannelStateEvent evt = (ChannelStateEvent) e; switch (evt.getState()) { case OPEN: if (!Boolean.TRUE.equals(evt.getValue())) { closeRequested(ctx, evt); } break; case BOUND: if (evt.getValue() != null) { bindRequested(ctx, evt); } else { unbindRequested(ctx, evt); } break; case CONNECTED: if (evt.getValue() != null) { connectRequested(ctx, evt); } else { disconnectRequested(ctx, evt); } break; case INTEREST_OPS: setInterestOpsRequested(ctx, evt); break; default: ctx.sendDownstream(e); } } else { ctx.sendDownstream(e); } }
接下來調用 handler 的 writeRequest(), 進行 pipeline 管道式調用:
// NettyHandler, com.alibaba.dubbo.remoting.transport.netty.NettyHandler @Override public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception { // 先調用父類, SimpleChannelHandler, 處理鏈上的邏輯 super.writeRequested(ctx, e); NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler); try { handler.sent(channel, e.getMessage()); } finally { NettyChannel.removeChannelIfDisconnected(ctx.getChannel()); } } // org.jboss.netty.channel.SimpleChannelHandler /** * Invoked when {@link Channel#write(Object)} is called. */ public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception { ctx.sendDownstream(e); } // org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext public void sendDownstream(ChannelEvent e) { // 查看是否有上一節點,若是有,遞歸調用。 即: pipeline 管道效果,依次流過事件處理 // 所謂的 pipeline 鏈的實現原理哦 DefaultChannelHandlerContext prev = getActualDownstreamContext(this.prev); if (prev == null) { try { getSink().eventSunk(DefaultChannelPipeline.this, e); } catch (Throwable t) { notifyHandlerException(e, t); } } else { DefaultChannelPipeline.this.sendDownstream(prev, e); } } // DefaultChannelPipeline.this.sendDownstream() , 調用業務的 handler 進行處理, 即編碼解碼過程 void sendDownstream(DefaultChannelHandlerContext ctx, ChannelEvent e) { if (e instanceof UpstreamMessageEvent) { throw new IllegalArgumentException("cannot send an upstream event to downstream"); } try { ((ChannelDownstreamHandler) ctx.getHandler()).handleDownstream(ctx, e); } catch (Throwable t) { // Unlike an upstream event, a downstream event usually has an // incomplete future which is supposed to be updated by ChannelSink. // However, if an exception is raised before the event reaches at // ChannelSink, the future is not going to be updated, so we update // here. e.getFuture().setFailure(t); notifyHandlerException(e, t); } }
而後是編碼操做!
// 調用encoder, InternalEncoder 的父類: org.jboss.netty.handler.codec.oneone.OneToOneEncoder public void handleDownstream( ChannelHandlerContext ctx, ChannelEvent evt) throws Exception { if (!(evt instanceof MessageEvent)) { ctx.sendDownstream(evt); return; } MessageEvent e = (MessageEvent) evt; // 編碼數據 if (!doEncode(ctx, e)) { ctx.sendDownstream(e); } } protected boolean doEncode(ChannelHandlerContext ctx, MessageEvent e) throws Exception { Object originalMessage = e.getMessage(); // 此處 encode() 由子類實現, 從而實現自定義的編碼方式 Object encodedMessage = encode(ctx, e.getChannel(), originalMessage); if (originalMessage == encodedMessage) { return false; } if (encodedMessage != null) { // 編碼好後,就進行遠端的寫入了, tcp 協議, TruncatedChannelBuffer // 其實在 encode() 的時候已經將數據發送了 write(ctx, e.getFuture(), encodedMessage, e.getRemoteAddress()); } return true; } // com.alibaba.dubbo.remoting.transport.netty.NettyCodecAdapter$InternalEncoder @Sharable private class InternalEncoder extends OneToOneEncoder { @Override protected Object encode(ChannelHandlerContext ctx, Channel ch, Object msg) throws Exception { com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.dynamicBuffer(1024); NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler); try { // 調用 codec 的 encode(), 即 DubboCountCodec, 固然,該 codec 是能夠經過請求 url 參數裏指定的 codec.encode(channel, buffer, msg); } finally { NettyChannel.removeChannelIfDisconnected(ch); } // 使用 ChannelBuffers 包裝 _buffer 後,返回數據給到調用方,以便進行寫稿 return ChannelBuffers.wrappedBuffer(buffer.toByteBuffer()); } } // com.alibaba.dubbo.rpc.protocol.dubbo.DubboCountCodec @Override public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException { codec.encode(channel, buffer, msg); } // DubboCountCodec, com.alibaba.dubbo.remoting.exchange.codec.ExchangeCodec @Override public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException { if (msg instanceof Request) { // 編碼數據 encodeRequest(channel, buffer, (Request) msg); } else if (msg instanceof Response) { encodeResponse(channel, buffer, (Response) msg); } else { super.encode(channel, buffer, msg); } }
具體的寫入格式以下: 請求頭魔數 -> 請求序列化方式標識 -> 請求類型標識 -> 請求序列號 -> body
// com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec, com.alibaba.dubbo.remoting.exchange.codec.ExchangeCodec protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException { Serialization serialization = getSerialization(channel); // 寫入格式以下: 請求頭魔數 -> 請求序列化方式標識 -> 請求類型標識 -> 請求序列號 -> body // 全部數據寫入 buffer // header. byte[] header = new byte[HEADER_LENGTH]; // set magic number. Bytes.short2bytes(MAGIC, header); // set request and serialization flag. header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId()); if (req.isTwoWay()) header[2] |= FLAG_TWOWAY; if (req.isEvent()) header[2] |= FLAG_EVENT; // set request id. Bytes.long2bytes(req.getId(), header, 4); // encode request data. int savedWriteIndex = buffer.writerIndex(); buffer.writerIndex(savedWriteIndex + HEADER_LENGTH); ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer); ObjectOutput out = serialization.serialize(channel.getUrl(), bos); if (req.isEvent()) { encodeEventData(channel, out, req.getData()); } else { // 編碼數據, 此處可能遇到複雜的對象, 解決辦法是 遞歸調用 JavaSerializer encodeRequestData(channel, out, req.getData(), req.getVersion()); } out.flushBuffer(); if (out instanceof Cleanable) { ((Cleanable) out).cleanup(); } bos.flush(); bos.close(); int len = bos.writtenBytes(); checkPayload(channel, len); Bytes.int2bytes(len, header, 12); // write buffer.writerIndex(savedWriteIndex); buffer.writeBytes(header); // write header. buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len); } // com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec , 進行數據編碼 @Override protected void encodeRequestData(Channel channel, ObjectOutput out, Object data, String version) throws IOException { RpcInvocation inv = (RpcInvocation) data; out.writeUTF(version); out.writeUTF(inv.getAttachment(Constants.PATH_KEY)); out.writeUTF(inv.getAttachment(Constants.VERSION_KEY)); out.writeUTF(inv.getMethodName()); out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes())); Object[] args = inv.getArguments(); if (args != null) for (int i = 0; i < args.length; i++) { // 寫入單個字段數據 out.writeObject(encodeInvocationArgument(channel, inv, i)); } out.writeObject(inv.getAttachments()); }
接下來看看 hessian 是如何進行數據序列化的? 其實就是調用 hessian 的 writeObject() 方法原理!
// com.alibaba.dubbo.common.serialize.hessian2.Hessian2ObjectOutput @Override public void writeObject(Object obj) throws IOException { mH2o.writeObject(obj); } // com.alibaba.com.caucho.hessian.io.Hessian2Output /** * Writes any object to the output stream. */ @Override public void writeObject(Object object) throws IOException { if (object == null) { writeNull(); return; } Serializer serializer; // 獲取序列化器: 這個過程比較繁雜會不停搜索符合條件的 Serializer, 默認 JavaSerializer, 而後調用 writeObject() 方法, serializer = findSerializerFactory().getSerializer(object.getClass()); serializer.writeObject(object, this); } // 序列化器工廠類 com.alibaba.com.caucho.hessian.io.SerializerFactory /** * Returns the serializer for a class. * * @param cl the class of the object that needs to be serialized. * @return a serializer object for the serialization. */ @Override public Serializer getSerializer(Class cl) throws HessianProtocolException { Serializer serializer; // 1. 先嚐試從基本的序列化器中查找 serializer = (Serializer) _staticSerializerMap.get(cl); if (serializer != null) { return serializer; } // 2. 從緩存中查找, 若是以前找到過的話 if (_cachedSerializerMap != null) { serializer = (Serializer) _cachedSerializerMap.get(cl); if (serializer != null) { return serializer; } } // 3. 初始化多個工廠類 for (int i = 0; serializer == null && _factories != null && i < _factories.size(); i++) { AbstractSerializerFactory factory; factory = (AbstractSerializerFactory) _factories.get(i); serializer = factory.getSerializer(cl); } // 4. 簡單明確的複合對象斷定 if (serializer != null) { } else if (isZoneId(cl)) //must before "else if (JavaSerializer.getWriteReplace(cl) != null)" serializer = ZoneIdSerializer.getInstance(); else if (isEnumSet(cl)) serializer = EnumSetSerializer.getInstance(); else if (JavaSerializer.getWriteReplace(cl) != null) serializer = new JavaSerializer(cl, _loader); else if (HessianRemoteObject.class.isAssignableFrom(cl)) serializer = new RemoteSerializer(); // else if (BurlapRemoteObject.class.isAssignableFrom(cl)) // serializer = new RemoteSerializer(); else if (Map.class.isAssignableFrom(cl)) { if (_mapSerializer == null) _mapSerializer = new MapSerializer(); serializer = _mapSerializer; } else if (Collection.class.isAssignableFrom(cl)) { if (_collectionSerializer == null) { _collectionSerializer = new CollectionSerializer(); } serializer = _collectionSerializer; } else if (cl.isArray()) { // 數組序列化器 serializer = new ArraySerializer(); } else if (Throwable.class.isAssignableFrom(cl)) { serializer = new ThrowableSerializer(cl, getClassLoader()); } else if (InputStream.class.isAssignableFrom(cl)) { serializer = new InputStreamSerializer(); } else if (Iterator.class.isAssignableFrom(cl)) { serializer = IteratorSerializer.create(); } else if (Enumeration.class.isAssignableFrom(cl)) { serializer = EnumerationSerializer.create(); } else if (Calendar.class.isAssignableFrom(cl)) { serializer = CalendarSerializer.create(); } else if (Locale.class.isAssignableFrom(cl)) { serializer = LocaleSerializer.create(); } else if (Enum.class.isAssignableFrom(cl)) { serializer = new EnumSerializer(cl); } // 5. 都找不到則使用默認序列化器, 即普通繼承了 Serializable 接口的對象都是此類 if (serializer == null) { serializer = getDefaultSerializer(cl); } if (_cachedSerializerMap == null) { _cachedSerializerMap = new ConcurrentHashMap(8); } // 6. 將查找結果存入緩存 _cachedSerializerMap.put(cl, serializer); return serializer; } /** * Returns the default serializer for a class that isn't matched * directly. Application can override this method to produce * bean-style serialization instead of field serialization. * * @param cl the class of the object that needs to be serialized. * @return a serializer object for the serialization. */ protected Serializer getDefaultSerializer(Class cl) { if (_defaultSerializer != null) return _defaultSerializer; if (!Serializable.class.isAssignableFrom(cl) && !_isAllowNonSerializable) { throw new IllegalStateException("Serialized class " + cl.getName() + " must implement java.io.Serializable"); } return new JavaSerializer(cl, _loader); } // com.alibaba.com.caucho.hessian.ioSerializer @Override public void writeObject(Object obj, AbstractHessianOutput out) throws IOException { if (out.addRef(obj)) { return; } Class cl = obj.getClass(); try { // 若是有寫 writeReplace() 方法,則直接調用其便可; 不然遞歸簡單序列化結構 if (_writeReplace != null) { Object repl; if (_writeReplaceFactory != null) repl = _writeReplace.invoke(_writeReplaceFactory, obj); else repl = _writeReplace.invoke(obj); //Some class would return itself for wrapReplace, which would cause infinite recursion //In this case, we could write the object just like normal cases if (repl != obj) { out.removeRef(obj); out.writeObject(repl); out.replaceRef(repl, obj); return; } } } catch (RuntimeException e) { throw e; } catch (Exception e) { // log.log(Level.FINE, e.toString(), e); throw new RuntimeException(e); } // 寫參數類名 int ref = out.writeObjectBegin(cl.getName()); if (ref < -1) { writeObject10(obj, out); } else { if (ref == -1) { // 寫開始標識 writeDefinition20(out); out.writeObjectBegin(cl.getName()); } // 寫字段數據 writeInstance(obj, out); } } // 寫字段信息, 依次列舉字段,而後依次寫入, 若是是複合類型, public void writeInstance(Object obj, AbstractHessianOutput out) throws IOException { for (int i = 0; i < _fields.length; i++) { Field field = _fields[i]; _fieldSerializers[i].serialize(out, obj, field); } } // 寫字段信息, FieldSerializer 是個 JavaSerializer 的內部類 static class FieldSerializer { static final FieldSerializer SER = new FieldSerializer(); void serialize(AbstractHessianOutput out, Object obj, Field field) throws IOException { Object value = null; try { value = field.get(obj); } catch (IllegalAccessException e) { log.log(Level.FINE, e.toString(), e); } try { // 基於前面已經寫入的參數類型信息,如今寫入 value 便可, 若是value() 是複合類型,則再次遞歸處理 out.writeObject(value); } catch (RuntimeException e) { throw new RuntimeException(e.getMessage() + "\n Java field: " + field, e); } catch (IOException e) { throw new IOExceptionWrapper(e.getMessage() + "\n Java field: " + field, e); } } }
來看一下一個 string 的寫入方式吧!
// String 的編碼, 直接調用 out.writeString() static class StringFieldSerializer extends FieldSerializer { static final FieldSerializer SER = new StringFieldSerializer(); @Override void serialize(AbstractHessianOutput out, Object obj, Field field) throws IOException { String value = null; try { value = (String) field.get(obj); } catch (IllegalAccessException e) { log.log(Level.FINE, e.toString(), e); } // 將string 寫入 buffer 中 out.writeString(value); } } // com.alibaba.com.caucho.hessian.io.Hessian2Output 將 string 轉換爲 byte 存儲起來, 經過移位處理的方式 /** * Writes a string value to the stream using UTF-8 encoding. * The string will be written with the following syntax: * <p> * <code><pre> * S b16 b8 string-value * </pre></code> * <p> * If the value is null, it will be written as * <p> * <code><pre> * N * </pre></code> * * @param value the string value to write. */ @Override public void writeString(String value) throws IOException { int offset = _offset; byte[] buffer = _buffer; if (SIZE <= offset + 16) { flush(); offset = _offset; } if (value == null) { buffer[offset++] = (byte) 'N'; _offset = offset; } else { int length = value.length(); int strOffset = 0; // 針對長字符, 分段循環寫入 while (length > 0x8000) { int sublen = 0x8000; offset = _offset; if (SIZE <= offset + 16) { flush(); offset = _offset; } // chunk can't end in high surrogate char tail = value.charAt(strOffset + sublen - 1); if (0xd800 <= tail && tail <= 0xdbff) sublen--; buffer[offset + 0] = (byte) BC_STRING_CHUNK; buffer[offset + 1] = (byte) (sublen >> 8); buffer[offset + 2] = (byte) (sublen); _offset = offset + 3; printString(value, strOffset, sublen); length -= sublen; strOffset += sublen; } offset = _offset; if (SIZE <= offset + 16) { flush(); offset = _offset; } // 先寫入長度信息, 再寫入string內容 if (length <= STRING_DIRECT_MAX) { // STRING_DIRECT_MAX = 0x1f // b0 格式寫入 buffer[offset++] = (byte) (BC_STRING_DIRECT + length); } else if (length <= STRING_SHORT_MAX) { // STRING_SHORT_MAX = 0x3ff // b8, 0x30 + x >> 8 buffer[offset++] = (byte) (BC_STRING_SHORT + (length >> 8)); buffer[offset++] = (byte) (length); } else { // <= 0x8000, 存在兩位字節 // S len buffer[offset++] = (byte) ('S'); buffer[offset++] = (byte) (length >> 8); buffer[offset++] = (byte) (length); } _offset = offset; printString(value, strOffset, length); } } // 寫入string內容, 以utf-8編碼 /** * Prints a string to the stream, encoded as UTF-8 * * @param v the string to print. */ public void printString(String v, int strOffset, int length) throws IOException { int offset = _offset; byte[] buffer = _buffer; for (int i = 0; i < length; i++) { if (SIZE <= offset + 16) { _offset = offset; flush(); offset = _offset; } char ch = v.charAt(i + strOffset); if (ch < 0x80) buffer[offset++] = (byte) (ch); else if (ch < 0x800) { buffer[offset++] = (byte) (0xc0 + ((ch >> 6) & 0x1f)); buffer[offset++] = (byte) (0x80 + (ch & 0x3f)); } else { buffer[offset++] = (byte) (0xe0 + ((ch >> 12) & 0xf)); buffer[offset++] = (byte) (0x80 + ((ch >> 6) & 0x3f)); buffer[offset++] = (byte) (0x80 + (ch & 0x3f)); } } _offset = offset; }
解碼則是在 JavaDeserializer 中, 大概思路就是, 按照序列化的方向,反向拆解下就好了!例如:
public JavaDeserializer(Class cl) { _type = cl; _fieldMap = getFieldMap(cl); _readResolve = getReadResolve(cl); if (_readResolve != null) { _readResolve.setAccessible(true); } Constructor[] constructors = cl.getDeclaredConstructors(); long bestCost = Long.MAX_VALUE; for (int i = 0; i < constructors.length; i++) { Class[] param = constructors[i].getParameterTypes(); long cost = 0; for (int j = 0; j < param.length; j++) { cost = 4 * cost; if (Object.class.equals(param[j])) cost += 1; else if (String.class.equals(param[j])) cost += 2; else if (int.class.equals(param[j])) cost += 3; else if (long.class.equals(param[j])) cost += 4; else if (param[j].isPrimitive()) cost += 5; else cost += 6; } if (cost < 0 || cost > (1 << 48)) cost = 1 << 48; cost += (long) param.length << 48; if (cost < bestCost) { _constructor = constructors[i]; bestCost = cost; } } if (_constructor != null) { _constructor.setAccessible(true); Class[] params = _constructor.getParameterTypes(); _constructorArgs = new Object[params.length]; for (int i = 0; i < params.length; i++) { _constructorArgs[i] = getParamArg(params[i]); } } } static class ObjectFieldDeserializer extends FieldDeserializer { private final Field _field; ObjectFieldDeserializer(Field field) { _field = field; } @Override void deserialize(AbstractHessianInput in, Object obj) throws IOException { Object value = null; try { value = in.readObject(_field.getType()); _field.set(obj, value); } catch (Exception e) { logDeserializeError(_field, obj, value, e); } } }
寫入最後數據後,觸發異步 worker 進行發送處理!
// org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink, 寫入最後數據後,觸發異步 worker 進行發送處理 private static void handleAcceptedSocket(ChannelEvent e) { if (e instanceof ChannelStateEvent) { ChannelStateEvent event = (ChannelStateEvent) e; NioSocketChannel channel = (NioSocketChannel) event.getChannel(); ChannelFuture future = event.getFuture(); ChannelState state = event.getState(); Object value = event.getValue(); switch (state) { case OPEN: if (Boolean.FALSE.equals(value)) { channel.worker.close(channel, future); } break; case BOUND: case CONNECTED: if (value == null) { channel.worker.close(channel, future); } break; case INTEREST_OPS: channel.worker.setInterestOps(channel, future, ((Integer) value).intValue()); break; } } else if (e instanceof MessageEvent) { MessageEvent event = (MessageEvent) e; NioSocketChannel channel = (NioSocketChannel) event.getChannel(); boolean offered = channel.writeBufferQueue.offer(event); assert offered; channel.worker.writeFromUserCode(channel); } } // org.jboss.netty.channel.socket.nio.AbstractNioWorker void writeFromUserCode(final AbstractNioChannel<?> channel) { if (!channel.isConnected()) { // 若是已斷開,則清理 buffer, 防止內存泄漏 cleanUpWriteBuffer(channel); return; } // 加入異步隊列處理, 成功則返回 if (scheduleWriteIfNecessary(channel)) { return; } // From here, we are sure Thread.currentThread() == workerThread. if (channel.writeSuspended) { return; } if (channel.inWriteNowLoop) { return; } // 當前線程,則直接寫 write0(channel); } protected static void cleanUpWriteBuffer(AbstractNioChannel<?> channel) { Exception cause = null; boolean fireExceptionCaught = false; // Clean up the stale messages in the write buffer. synchronized (channel.writeLock) { MessageEvent evt = channel.currentWriteEvent; if (evt != null) { // Create the exception only once to avoid the excessive overhead // caused by fillStackTrace. if (channel.isOpen()) { cause = new NotYetConnectedException(); } else { cause = new ClosedChannelException(); } ChannelFuture future = evt.getFuture(); if (channel.currentWriteBuffer != null) { channel.currentWriteBuffer.release(); channel.currentWriteBuffer = null; } channel.currentWriteEvent = null; // Mark the event object for garbage collection. //noinspection UnusedAssignment evt = null; future.setFailure(cause); fireExceptionCaught = true; } Queue<MessageEvent> writeBuffer = channel.writeBufferQueue; for (;;) { evt = writeBuffer.poll(); if (evt == null) { break; } // Create the exception only once to avoid the excessive overhead // caused by fillStackTrace. if (cause == null) { if (channel.isOpen()) { cause = new NotYetConnectedException(); } else { cause = new ClosedChannelException(); } fireExceptionCaught = true; } evt.getFuture().setFailure(cause); } } if (fireExceptionCaught) { if (isIoThread(channel)) { fireExceptionCaught(channel, cause); } else { fireExceptionCaughtLater(channel, cause); } } } protected void write0(AbstractNioChannel<?> channel) { boolean open = true; boolean addOpWrite = false; boolean removeOpWrite = false; boolean iothread = isIoThread(channel); long writtenBytes = 0; final SocketSendBufferPool sendBufferPool = this.sendBufferPool; final WritableByteChannel ch = channel.channel; final Queue<MessageEvent> writeBuffer = channel.writeBufferQueue; final int writeSpinCount = channel.getConfig().getWriteSpinCount(); List<Throwable> causes = null; synchronized (channel.writeLock) { channel.inWriteNowLoop = true; for (;;) { MessageEvent evt = channel.currentWriteEvent; SendBuffer buf = null; ChannelFuture future = null; try { if (evt == null) { if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) { removeOpWrite = true; channel.writeSuspended = false; break; } future = evt.getFuture(); channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage()); } else { future = evt.getFuture(); buf = channel.currentWriteBuffer; } long localWrittenBytes = 0; for (int i = writeSpinCount; i > 0; i --) { localWrittenBytes = buf.transferTo(ch); if (localWrittenBytes != 0) { writtenBytes += localWrittenBytes; break; } if (buf.finished()) { break; } } if (buf.finished()) { // Successful write - proceed to the next message. buf.release(); channel.currentWriteEvent = null; channel.currentWriteBuffer = null; // Mark the event object for garbage collection. //noinspection UnusedAssignment evt = null; buf = null; future.setSuccess(); } else { // Not written fully - perhaps the kernel buffer is full. addOpWrite = true; channel.writeSuspended = true; if (writtenBytes > 0) { // Notify progress listeners if necessary. future.setProgress( localWrittenBytes, buf.writtenBytes(), buf.totalBytes()); } break; } } catch (AsynchronousCloseException e) { // Doesn't need a user attention - ignore. } catch (Throwable t) { if (buf != null) { buf.release(); } channel.currentWriteEvent = null; channel.currentWriteBuffer = null; // Mark the event object for garbage collection. //noinspection UnusedAssignment buf = null; //noinspection UnusedAssignment evt = null; if (future != null) { future.setFailure(t); } if (iothread) { // An exception was thrown from within a write in the iothread. We store a reference to it // in a list for now and notify the handlers in the chain after the writeLock was released // to prevent possible deadlock. // See #1310 if (causes == null) { causes = new ArrayList<Throwable>(1); } causes.add(t); } else { fireExceptionCaughtLater(channel, t); } if (t instanceof IOException) { // close must be handled from outside the write lock to fix a possible deadlock // which can happen when MemoryAwareThreadPoolExecutor is used and the limit is exceed // and a close is triggered while the lock is hold. This is because the close(..) // may try to submit a task to handle it via the ExecutorHandler which then deadlocks. // See #1310 open = false; } } } channel.inWriteNowLoop = false; // Initially, the following block was executed after releasing // the writeLock, but there was a race condition, and it has to be // executed before releasing the writeLock: // // https://issues.jboss.org/browse/NETTY-410 // if (open) { if (addOpWrite) { setOpWrite(channel); } else if (removeOpWrite) { clearOpWrite(channel); } } } if (causes != null) { for (Throwable cause: causes) { // notify about cause now as it was triggered in the write loop fireExceptionCaught(channel, cause); } } if (!open) { // close the channel now close(channel, succeededFuture(channel)); } // 鉤子事件 if (iothread) { fireWriteComplete(channel, writtenBytes); } else { fireWriteCompleteLater(channel, writtenBytes); } }