現在,咱們想要開發一個網絡應用,那是至關地方便。不過就是引入一個框架,而後設置些參數,而後寫寫業務代碼就搞定了。java
寫業務代碼天然很重要,可是你知道:react
你的數據是怎麼來的嗎?經過網絡傳輸過來的唄。git
你知道網絡是經過什麼方式傳輸過來的嗎?光纖唄,TCP/IP協議唄。github
看起來都難不住咱們的同窗們,可是,以上問題都不是咱們關注的重點,咱們今天要關注的是,TCP.IP協議是如何把數據傳輸到咱們的應用服務器,並且準確地交到對應的業務代碼手上的?apache
咱們也不關注TCP協議的三次握手四次揮手,咱們只須要確認一點,那就是TCP.IP協議是流式傳輸的,即數據是源源不斷地從客戶端傳遞到服務端的,而應用層是如何知道這些數據是什麼的呢?固然這是上層的應用協議要作的事,好比http,smtp,ftp等等。bootstrap
拋開其餘不說,我們使用 netty 來開發應用程序時,netty自己就承擔了一個高層應用協議的角色,因此,咱們能夠從它是怎麼識別這些傳輸過來的數據的過程,來一窺應用層協議的端倪。promise
其實大的方向都很簡單,即客戶端使用一種序列化協議將數據序列化,而後經過網絡傳輸到服務端,而後服務端使用相應的反序列化協議,將數據解出來,再交給業務程序就行了。服務器
因此,看起來好像只是一個序列化反序列化的問題而已。但若是是這樣,我們今天就不用再想這個問題了。網絡
咱們要考慮的是,客戶端發送的數據是一次性到達服務端的嗎?若是是這樣,那太簡單了,直接獲取數據主好了。可是,若是咱們要發送的數據很是大,TCP.IP能支持一會兒傳輸嗎?這是不可能的,TCP有一個MSS最大報文長度限制,超過這個以後,就必須進行拆分發送了。(粘包與拆包,太專業了)app
咱們來看下netty是如何處理這些相關數據的?
在dubbo中,是如何處利用netty理數據拆分的呢?
首先,咱們看下dubbo建立netty的方式: (主要添加幾個編碼器解碼器,以及handler)
// org.apache.dubbo.remoting.transport.netty4.NettyServer @Override protected void doOpen() throws Throwable { bootstrap = new ServerBootstrap(); bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true)); workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS), new DefaultThreadFactory("NettyServerWorker", true)); final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this); channels = nettyServerHandler.getChannels(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE) .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { // FIXME: should we use getTimeout()? int idleTimeout = UrlUtils.getIdleTimeout(getUrl()); NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug .addLast("decoder", adapter.getDecoder()) .addLast("encoder", adapter.getEncoder()) .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS)) .addLast("handler", nettyServerHandler); } }); // bind ChannelFuture channelFuture = bootstrap.bind(getBindAddress()); channelFuture.syncUninterruptibly(); channel = channelFuture.channel(); }
其實netty的使用就是這麼簡單,你只需定義你的協議,你的handler就能夠了,其餘複雜的底層工做,一律無論!
咱們首先來看netty是如何監聽網絡數據到來的?(基於 nio 綁定端口鏈接)
// io.netty.channel.socket.nio.NioServerSocketChannel // 綁定socket服務到 nio channel 上 @Override protected void doBind(SocketAddress localAddress) throws Exception { if (PlatformDependentVersion() >= 7) { javaChannel().bind(localAddress, config.getBacklog()); } else { javaChannel().socket().bind(localAddress, config.getBacklog()); } } @Override protected ServerSocketChannel javaChannel() { return (ServerSocketChannel) superChannel(); }
因此,其實本身寫 nio 的 server/client 可能也不會太難吧,可是你要應用的各類異常狀況太多,就不見得能把握好了。
netty 的線程模型是 reactor 模型,有一個事件循環過程
// io.netty.channel.nio.NioEventLoop // eventLoop 掃描事件 @Override protected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.SELECT: select(wakenUp.getAndSet(false)); // 'wakenUp.compareAndSet(false, true)' is always evaluated // before calling 'selector.wakeup()' to reduce the wake-up // overhead. (Selector.wakeup() is an expensive operation.) // // However, there is a race condition in this approach. // The race condition is triggered when 'wakenUp' is set to // true too early. // // 'wakenUp' is set to true too early if: // 1) Selector is waken up between 'wakenUp.set(false)' and // 'selector.select(...)'. (BAD) // 2) Selector is waken up between 'selector.select(...)' and // 'if (wakenUp.get()) { ... }'. (OK) // // In the first case, 'wakenUp' is set to true and the // following 'selector.select(...)' will wake up immediately. // Until 'wakenUp' is set to false again in the next round, // 'wakenUp.compareAndSet(false, true)' will fail, and therefore // any attempt to wake up the Selector will fail, too, causing // the following 'selector.select(...)' call to block // unnecessarily. // // To fix this problem, we wake up the selector again if wakenUp // is true immediately after selector.select(...). // It is inefficient in that it wakes up the selector for both // the first case (BAD - wake-up required) and the second case // (OK - no wake-up required). if (wakenUp.get()) { selector.wakeup(); } default: // fallthrough } cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; if (ioRatio == 100) { try { processSelectedKeys(); } finally { // Ensure we always run tasks. runAllTasks(); } } else { final long ioStartTime = System.nanoTime(); try { // 處理事件 processSelectedKeys(); } finally { // Ensure we always run tasks. final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } catch (Throwable t) { handleLoopException(t); } // Always handle shutdown even if the loop processing threw an exception. try { if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return; } } } catch (Throwable t) { handleLoopException(t); } } } // 處理事件 private void processSelectedKeys() { if (selectedKeys != null) { // 使用selectKeys進行處理 processSelectedKeysOptimized(); } else { processSelectedKeysPlain(selector.selectedKeys()); } } private void processSelectedKeysOptimized() { for (int i = 0; i < selectedKeys.size; ++i) { final SelectionKey k = selectedKeys.keys[i]; // null out entry in the array to allow to have it GC'ed once the Channel close // See https://github.com.netty.netty.issues/2363 selectedKeys.keys[i] = null; final Object a = k.attachment(); if (a instanceof AbstractNioChannel) { // ... processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } if (needsToSelectAgain) { // null out entries in the array to allow to have it GC'ed once the Channel close // See https://github.com.netty.netty.issues/2363 selectedKeys.reset(i + 1); selectAgain(); i = -1; } } } private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { final EventLoop eventLoop; try { eventLoop = ch.eventLoop(); } catch (Throwable ignored) { // If the channel implementation throws an exception because there is no event loop, we ignore this // because we are only trying to determine if ch is registered to this event loop and thus has authority // to close ch. return; } // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is // still healthy and should not be closed. // See https://github.com.netty.netty.issues/5125 if (eventLoop != this || eventLoop == null) { return; } // close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise()); return; } try { int readyOps = k.readyOps(); // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise // the NIO JDK channel implementation may throw a NotYetConnectedException. if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com.netty.netty.issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } // Process OP_WRITE first as we may be able to write some queued buffers and so free memory. if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); } // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop // 讀取數據,由 unsafe 類進行循環數據讀取 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } } // io.netty.channel.nio.AbstractNioMessageChannel // 處理真正的讀數據過程 private final class NioMessageUnsafe extends AbstractNioUnsafe { private final List<Object> readBuf = new ArrayList<Object>(); @Override public void read() { assert eventLoop().inEventLoop(); final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.reset(config); boolean closed = false; Throwable exception = null; try { try { // 循環讀取數據,將數據讀取到 readBuf 中 do { int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } // 記錄被讀取了多少次數據了 allocHandle.incMessagesRead(localRead); } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } int size = readBuf.size(); for (int i = 0; i < size; i ++) { readPending = false; // 依次調用管道進行處理 pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (exception != null) { closed = closeOnReadError(exception); pipeline.fireExceptionCaught(exception); } if (closed) { inputShutdown = true; if (isOpen()) { close(voidPromise()); } } } finally { // Check if there is a readPending which was not processed yet. // This could be for two reasons: // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // // See https://github.com.netty.netty.issues/2254 if (!readPending && !config.isAutoRead()) { removeReadOp(); } } } } // io.netty.channel.socket.nio.NioServerSocketChannel @Override protected int doReadMessages(List<Object> buf) throws Exception { SocketChannel ch = SocketUtils.accept(javaChannel()); try { if (ch != null) { buf.add(new NioSocketChannel(this, ch)); return 1; } } catch (Throwable t) { logger.warn("Failed to create a new channel from an accepted socket.", t); try { ch.close(); } catch (Throwable t2) { logger.warn("Failed to close a socket.", t2); } } return 0; } // io.netty.channel.DefaultChannelPipeline @Override public final ChannelPipeline fireChannelRead(Object msg) { AbstractChannelHandlerContext.invokeChannelRead(head, msg); return this; } // io.netty.channel.AbstractChannelHandlerContext static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); // 在處理中,則直接調用,不然放入線程池運行 if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); } } private void invokeChannelRead(Object msg) { if (invokeHandler()) { try { // 調用入站處理器讀取消息 ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRead(msg); } } final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler { private final Unsafe unsafe; HeadContext(DefaultChannelPipeline pipeline) { super(pipeline, null, HEAD_NAME, false, true); unsafe = pipeline.channel().unsafe(); setAddComplete(); } @Override public ChannelHandler handler() { return this; } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // NOOP } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { // NOOP } @Override public void bind( ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { unsafe.bind(localAddress, promise); } @Override public void connect( ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { unsafe.connect(remoteAddress, localAddress, promise); } @Override public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { unsafe.disconnect(promise); } @Override public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { unsafe.close(promise); } @Override public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { unsafe.deregister(promise); } @Override public void read(ChannelHandlerContext ctx) { unsafe.beginRead(); } @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { unsafe.write(msg, promise); } @Override public void flush(ChannelHandlerContext ctx) throws Exception { unsafe.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.fireExceptionCaught(cause); } @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { invokeHandlerAddedIfNeeded(); ctx.fireChannelRegistered(); } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelUnregistered(); // Remove all handlers sequentially if channel is closed and unregistered. if (!channel.isOpen()) { destroy(); } } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelActive(); readIfIsAutoRead(); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelInactive(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ctx.fireChannelRead(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelReadComplete(); readIfIsAutoRead(); } private void readIfIsAutoRead() { if (channel.config().isAutoRead()) { channel.read(); } } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { ctx.fireUserEventTriggered(evt); } @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelWritabilityChanged(); } } // DefaultChannelPipeline // io.netty.channel.AbstractChannelHandlerContext @Override public ChannelHandlerContext fireChannelRead(final Object msg) { invokeChannelRead(findContextInbound(), msg); return this; } private AbstractChannelHandlerContext findContextInbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.next; } while (!ctx.inbound); return ctx; } static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); } } // io.netty.bootstrap.ServerBootstrap$ServerBootstrapAcceptor @Override @SuppressWarnings("unchecked") public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; child.pipeline().addLast(childHandler); setChannelOptions(child, childOptions, logger); for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } try { childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } } // DefaultChannelPipeline @Override public final ChannelPipeline addLast(ChannelHandler... handlers) { return addLast(null, handlers); } @Override public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) { if (handlers == null) { throw new NullPointerException("handlers"); } for (ChannelHandler h: handlers) { if (h == null) { break; } addLast(executor, null, h); } return this; } @Override public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this) { checkMultiplicity(handler); newCtx = newContext(group, filterName(name, handler), handler); // 添加到pipeline的尾部 addLast0(newCtx); // If the registered is false it means that the channel was not registered on an eventloop yet. // In this case we add the context to the pipeline and add a task that will call // ChannelHandler.handlerAdded(...) once the channel is registered. if (!registered) { newCtx.setAddPending(); callHandlerCallbackLater(newCtx, true); return this; } EventExecutor executor = newCtx.executor(); if (!executor.inEventLoop()) { newCtx.setAddPending(); executor.execute(new Runnable() { @Override public void run() { callHandlerAdded0(newCtx); } }); return this; } } callHandlerAdded0(newCtx); return this; } private void addLast0(AbstractChannelHandlerContext newCtx) { AbstractChannelHandlerContext prev = tail.prev; newCtx.prev = prev; newCtx.next = tail; prev.next = newCtx; tail.prev = newCtx; } // NioEventLoopGroup // io.netty.channel.MultithreadEventLoopGroup @Override public ChannelFuture register(Channel channel) { return next().register(channel); } private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser { private final AtomicInteger idx = new AtomicInteger(); private final EventExecutor[] executors; PowerOfTwoEventExecutorChooser(EventExecutor[] executors) { this.executors = executors; } @Override public EventExecutor next() { return executors[idx.getAndIncrement() & executors.length - 1]; } } // io.netty.channel.SingleThreadEventLoop @Override public ChannelFuture register(Channel channel) { return register(new DefaultChannelPromise(channel, this)); } @Override public ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); // 此處註冊好以後,就會開啓另外的線程池來處理數據了 promise.channel().unsafe().register(this, promise); return promise; } // io.netty.channel.AbstractChannel $ AbstractUnsafe @Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { if (eventLoop == null) { throw new NullPointerException("eventLoop"); } if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; } AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } } // io.netty.util.concurrent.SingleThreadEventExecutor @Override public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); } boolean inEventLoop = inEventLoop(); if (inEventLoop) { addTask(task); } else { startThread(); addTask(task); if (isShutdown() && removeTask(task)) { reject(); } } if (!addTaskWakesUp && wakesUpForTask(task)) { // 喚醒下一次接收數據 wakeup(inEventLoop); } } private void startThread() { if (STATE_UPDATER.get(this) == ST_NOT_STARTED) { if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { // 把事件放入到另外一個線程池處理, 一個階段處理結束 doStartThread(); } } }
開啓新的線程處理邏輯
// 開啓新的線程處理邏輯 // 把事件放入到另外一個線程池處理 private void doStartThread() { assert thread == null; executor.execute(new Runnable() { @Override public void run() { thread = Thread.currentThread(); if (interrupted) { thread.interrupt(); } boolean success = false; updateLastExecutionTime(); try { SingleThreadEventExecutor.this.run(); success = true; } catch (Throwable t) { logger.warn("Unexpected exception from an event executor: ", t); } finally { for (;;) { int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this); if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet( SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) { break; } } // Check if confirmShutdown() was called at the end of the loop. if (success && gracefulShutdownStartTime == 0) { logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " + SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " + "before run() implementation terminates."); } try { // Run all remaining tasks and shutdown hooks. for (;;) { if (confirmShutdown()) { break; } } } finally { try { cleanup(); } finally { STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED); threadLock.release(); if (!taskQueue.isEmpty()) { logger.warn( "An event executor terminated with " + "non-empty task queue (" + taskQueue.size() + ')'); } terminationFuture.setSuccess(null); } } } } }); } public final class ThreadPerTaskExecutor implements Executor { private final ThreadFactory threadFactory; public ThreadPerTaskExecutor(ThreadFactory threadFactory) { if (threadFactory == null) { throw new NullPointerException("threadFactory"); } this.threadFactory = threadFactory; } @Override public void execute(Runnable command) { threadFactory.newThread(command).start(); } } @Override protected void wakeup(boolean inEventLoop) { if (!inEventLoop && wakenUp.compareAndSet(false, true)) { selector.wakeup(); } } 實際解析數據信息是在 fireChannelRead 時觸發的。 @Override public final ChannelPipeline fireChannelRead(Object msg) { AbstractChannelHandlerContext.invokeChannelRead(head, msg); return this; } // 從 inBound 入站鏈中依次調用 channelRead() 方法 static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); } } private void invokeChannelRead(Object msg) { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRead(msg); } } // HeadContext @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ctx.fireChannelRead(msg); } // AbstractChannelHandlerContext @Override public ChannelHandlerContext fireChannelRead(final Object msg) { invokeChannelRead(findContextInbound(), msg); return this; } // io.netty.handler.codec.ByteToMessageDecoder // 咱們對數據的解析由這個類進行處理 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) { CodecOutputList out = CodecOutputList.newInstance(); try { ByteBuf data = (ByteBuf) msg; first = cumulation == null; // 針對屢次到來的包,進行從新計算 if (first) { cumulation = data; } else { cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data); } // 調用解碼方法 callDecode(ctx, cumulation, out); } catch (DecoderException e) { throw e; } catch (Throwable t) { throw new DecoderException(t); } finally { if (cumulation != null && !cumulation.isReadable()) { numReads = 0; cumulation.release(); cumulation = null; } else if (++ numReads >= discardAfterReads) { // We did enough reads already try to discard some bytes so we not risk to see a OOME. // See https://github.com.netty.netty.issues/4275 numReads = 0; discardSomeReadBytes(); } int size = out.size(); decodeWasNull = !out.insertSinceRecycled(); // 若是解析到數據,就會往下一個 InBound 節點傳 fireChannelRead(ctx, out, size); out.recycle(); } } else { ctx.fireChannelRead(msg); } } /** * Called once data should be decoded from the given {@link ByteBuf}. This method will call * {@link #decode(ChannelHandlerContext, ByteBuf, List)} as long as decoding should take place. * * @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to * @param in the {@link ByteBuf} from which to read data * @param out the {@link List} to which decoded messages should be added */ protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { try { // 只要有可用的數據,會一直循環調用 decode 方法 while (in.isReadable()) { int outSize = out.size(); if (outSize > 0) { fireChannelRead(ctx, out, outSize); out.clear(); // Check if this handler was removed before continuing with decoding. // If it was removed, it is not safe to continue to operate on the buffer. // // See: // - https://github.com.netty.netty.issues/4635 if (ctx.isRemoved()) { break; } outSize = 0; } int oldInputLength = in.readableBytes(); // 調用自行實現的 decode 方法,實現數據的組裝 // 經過添加多個 pipeline 來實現業務的處理 decode(ctx, in, out); // Check if this handler was removed before continuing the loop. // If it was removed, it is not safe to continue to operate on the buffer. // // See https://github.com.netty.netty.issues/1664 if (ctx.isRemoved()) { break; } if (outSize == out.size()) { if (oldInputLength == in.readableBytes()) { break; } else { continue; } } if (oldInputLength == in.readableBytes()) { throw new DecoderException( StringUtil.simpleClassName(getClass()) + ".decode() did not read anything but decoded a message."); } if (isSingleDecode()) { break; } } } catch (DecoderException e) { throw e; } catch (Throwable cause) { throw new DecoderException(cause); } } /** * Get {@code numElements} out of the {@link CodecOutputList} and forward these through the pipeline. */ static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) { // 每一個解析到的元素都會調用一次 fireChannelRead for (int i = 0; i < numElements; i ++) { ctx.fireChannelRead(msgs.getUnsafe(i)); } }
若是本身來寫這個組裝包的邏輯,可能會是這樣的:(僅僅是等到全部數據都到後,再傳入下一個處理器便可)
@Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if (in.readableBytes() < 4) { return; } in.markReaderIndex(); int dataLength = in.readInt(); // 若是整個包還沒完整,則等待下次調用 if (in.readableBytes() < dataLength) { in.resetReaderIndex(); return; } byte[] data = new byte[dataLength]; in.readBytes(data); Object obj = JSON.parseObject(data, target); out.add(obj); }
針對外部屢次調入站程序的方法,經過 cumulate 方法組裝數據
// 針對外部屢次調入站程序的方法,經過 cumulate 方法組裝數據 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) { CodecOutputList out = CodecOutputList.newInstance(); try { ByteBuf data = (ByteBuf) msg; first = cumulation == null; if (first) { cumulation = data; } else { // 合併數據 cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data); } callDecode(ctx, cumulation, out); } catch (DecoderException e) { throw e; } catch (Throwable t) { throw new DecoderException(t); } finally { if (cumulation != null && !cumulation.isReadable()) { numReads = 0; cumulation.release(); cumulation = null; } else if (++ numReads >= discardAfterReads) { // We did enough reads already try to discard some bytes so we not risk to see a OOME. // See https://github.com.netty.netty.issues/4275 numReads = 0; discardSomeReadBytes(); } int size = out.size(); decodeWasNull = !out.insertSinceRecycled(); fireChannelRead(ctx, out, size); out.recycle(); } } else { ctx.fireChannelRead(msg); } } /** * Cumulate {@link ByteBuf}s by merge them into one {@link ByteBuf}'s, using memory copies. */ public static final Cumulator MERGE_CUMULATOR = new Cumulator() { @Override public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) { final ByteBuf buffer; if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes() || cumulation.refCnt() > 1 || cumulation.isReadOnly()) { // Expand cumulation (by replace it) when either there is not more room in the buffer // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or // duplicate().retain() or if its read-only. // // See: // - https://github.com.netty.netty.issues/2327 // - https://github.com.netty.netty.issues/1764 buffer = expandCumulation(alloc, cumulation, in.readableBytes()); } else { buffer = cumulation; } buffer.writeBytes(in); in.release(); return buffer; } };
下面咱們來看下 dubbo 是如何進行數據包的組裝的呢?(NEED_MORE_INPUT 的應用)
// Decoder 處理邏輯 // org.apache.dubbo.remoting.transport.netty4.NettyCodecAdapter private class InternalDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception { ChannelBuffer message = new NettyBackedChannelBuffer(input); NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); try { // decode object. do { int saveReaderIndex = message.readerIndex(); Object msg = codec.decode(channel, message); // 只要遇到 NEED_MORE_INPUT 標識,則不會算本次接收完成,等待下一次回調 // 此處會先交給一連串的 codec 處理 if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) { message.readerIndex(saveReaderIndex); break; } else { //is it possible to go here ? if (saveReaderIndex == message.readerIndex()) { throw new IOException("Decode without read data."); } if (msg != null) { out.add(msg); } } } while (message.readable()); } finally { NettyChannel.removeChannelIfDisconnected(ctx.channel()); } } } // org.apache.dubbo.rpc.protocol.dubbo.DubboCountCodec @Override public Object decode(Channel channel, ChannelBuffer buffer) throws IOException { int save = buffer.readerIndex(); MultiMessage result = MultiMessage.create(); do { Object obj = codec.decode(channel, buffer); if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) { buffer.readerIndex(save); break; } else { result.addMessage(obj); logMessageLength(obj, buffer.readerIndex() - save); save = buffer.readerIndex(); } } while (true); if (result.isEmpty()) { return Codec2.DecodeResult.NEED_MORE_INPUT; } if (result.size() == 1) { return result.get(0); } return result; } // org.apache.dubbo.remoting.exchange.codec.ExchangeCodec @Override public Object decode(Channel channel, ChannelBuffer buffer) throws IOException { int readable = buffer.readableBytes(); // 能夠看到,每一個包都會有一個包頭,只要解析出來,就能夠知道它的類型,長度了 byte[] header = new byte[Math.min(readable, HEADER_LENGTH)]; buffer.readBytes(header); return decode(channel, buffer, readable, header); } @Override protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException { // check magic number. if (readable > 0 && header[0] != MAGIC_HIGH || readable > 1 && header[1] != MAGIC_LOW) { int length = header.length; if (header.length < readable) { header = Bytes.copyOf(header, readable); buffer.readBytes(header, length, readable - length); } for (int i = 1; i < header.length - 1; i++) { if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) { buffer.readerIndex(buffer.readerIndex() - header.length + i); header = Bytes.copyOf(header, i); break; } } return super.decode(channel, buffer, readable, header); } // check length. if (readable < HEADER_LENGTH) { return DecodeResult.NEED_MORE_INPUT; } // get data length. int len = Bytes.bytes2int(header, 12); checkPayload(channel, len); // 只要數據未達到要求的長度,則返回 NEED_MORE_INPUT int tt = len + HEADER_LENGTH; if (readable < tt) { return DecodeResult.NEED_MORE_INPUT; } // limit input stream. ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len); try { return decodeBody(channel, is, header); } finally { if (is.available() > 0) { try { if (logger.isWarnEnabled()) { logger.warn("Skip input stream " + is.available()); } StreamUtils.skipUnusedStream(is); } catch (IOException e) { logger.warn(e.getMessage(), e); } } } } // org.apache.dubbo.remoting.telnet.codec.TelnetCodec @SuppressWarnings("unchecked") protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] message) throws IOException { if (isClientSide(channel)) { return toString(message, getCharset(channel)); } checkPayload(channel, readable); if (message == null || message.length == 0) { return DecodeResult.NEED_MORE_INPUT; } if (message[message.length - 1] == '\b') { // Windows backspace echo try { boolean doublechar = message.length >= 3 && message[message.length - 3] < 0; // double byte char channel.send(new String(doublechar ? new byte[]{32, 32, 8, 8} : new byte[]{32, 8}, getCharset(channel).name())); } catch (RemotingException e) { throw new IOException(StringUtils.toString(e)); } return DecodeResult.NEED_MORE_INPUT; } for (Object command : EXIT) { if (isEquals(message, (byte[]) command)) { if (logger.isInfoEnabled()) { logger.info(new Exception("Close channel " + channel + " on exit command: " + Arrays.toString((byte[]) command))); } channel.close(); return null; } } boolean up = endsWith(message, UP); boolean down = endsWith(message, DOWN); if (up || down) { LinkedList<String> history = (LinkedList<String>) channel.getAttribute(HISTORY_LIST_KEY); if (CollectionUtils.isEmpty(history)) { return DecodeResult.NEED_MORE_INPUT; } Integer index = (Integer) channel.getAttribute(HISTORY_INDEX_KEY); Integer old = index; if (index == null) { index = history.size() - 1; } else { if (up) { index = index - 1; if (index < 0) { index = history.size() - 1; } } else { index = index + 1; if (index > history.size() - 1) { index = 0; } } } if (old == null || !old.equals(index)) { channel.setAttribute(HISTORY_INDEX_KEY, index); String value = history.get(index); if (old != null && old >= 0 && old < history.size()) { String ov = history.get(old); StringBuilder buf = new StringBuilder(); for (int i = 0; i < ov.length(); i++) { buf.append("\b"); } for (int i = 0; i < ov.length(); i++) { buf.append(" "); } for (int i = 0; i < ov.length(); i++) { buf.append("\b"); } value = buf.toString() + value; } try { channel.send(value); } catch (RemotingException e) { throw new IOException(StringUtils.toString(e)); } } return DecodeResult.NEED_MORE_INPUT; } for (Object command : EXIT) { if (isEquals(message, (byte[]) command)) { if (logger.isInfoEnabled()) { logger.info(new Exception("Close channel " + channel + " on exit command " + command)); } channel.close(); return null; } } byte[] enter = null; for (Object command : ENTER) { if (endsWith(message, (byte[]) command)) { enter = (byte[]) command; break; } } if (enter == null) { return DecodeResult.NEED_MORE_INPUT; } LinkedList<String> history = (LinkedList<String>) channel.getAttribute(HISTORY_LIST_KEY); Integer index = (Integer) channel.getAttribute(HISTORY_INDEX_KEY); channel.removeAttribute(HISTORY_INDEX_KEY); if (CollectionUtils.isNotEmpty(history) && index != null && index >= 0 && index < history.size()) { String value = history.get(index); if (value != null) { byte[] b1 = value.getBytes(); byte[] b2 = new byte[b1.length + message.length]; System.arraycopy(b1, 0, b2, 0, b1.length); System.arraycopy(message, 0, b2, b1.length, message.length); message = b2; } } String result = toString(message, getCharset(channel)); if (result.trim().length() > 0) { if (history == null) { history = new LinkedList<String>(); channel.setAttribute(HISTORY_LIST_KEY, history); } if (history.isEmpty()) { history.addLast(result); } else if (!result.equals(history.getLast())) { history.remove(result); history.addLast(result); if (history.size() > 10) { history.removeFirst(); } } } return result; }
因此,其實 dubbo 實現拆包的方式,也是依賴於 netty, 經過斷定數據長度來決定是否包已到齊的。
一樣,根據數據長度,也能夠解決粘包問題,由於從頭裏指定的長度,便可知道數據到哪裏時已取完,從而將粘在一塊兒的包分開。
其實netty中提供了幾個開箱即用的拆包方法 FixedLengthFrameDecoder,LineBasedFrameDecoder,DelimiterBasedFrameDecoder,LengthFieldBasedFrameDecoder。望文生義。只是本身實現也並不難,爲何不呢?
以上就是基於netty的TCP數據包的處理問題,也是一個簡單的應用層協議處理過程,使咱們能夠更直接地瞭解應用層協議的處理過程。
固然,對於上面的基於數據長度進行數據包斷定,會存在一些問題:
1. 當數據包很大時,將會阻塞其餘請求;
2. 當數據包很大時,將會佔用大量內存;
3. 同一鏈接中,不可能存在數據包的亂序傳輸;(TCP是否支持亂序、混合包傳輸?這是個問題)
固然,以上協議並不處理這種狀況,針對大數據量請求,咱們能夠在客戶端作好分包請求,從而減輕壓力。
嘮叨: 看透本質。