把數據返回客戶端,須要經歷三個步驟:
一、申請一塊緩存buf,寫入數據。
二、將buf保存到ChannelOutboundBuffer中。
三、將ChannelOutboundBuffer中的buff輸出到socketChannel中。java
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ReferenceCountUtil.release(msg); ByteBuf buf1 = ctx.alloc().buffer(4); buf1.writeInt(1); ByteBuf buf2 = ctx.alloc().buffer(4); buf2.writeInt(2); ByteBuf buf3 = ctx.alloc().buffer(4); buf3.writeInt(3); ctx.write(buf1); ctx.write(buf2); ctx.write(buf3); ctx.flush(); }
爲何須要把buf保存到ChannelOutboundBuffer?git
ctx.write()實現:github
//AbstractChannelHandlerContext.java public ChannelFuture write(Object msg) { return write(msg, newPromise()); } private void write(Object msg, boolean flush, ChannelPromise promise) { AbstractChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeWrite(msg, promise); if (flush) { next.invokeFlush(); } } else { AbstractWriteTask task; if (flush) { task = WriteAndFlushTask.newInstance(next, msg, promise); } else { task = WriteTask.newInstance(next, msg, promise); } safeExecute(executor, task, promise, msg); } }
默認狀況下,findContextOutbound()會找到pipeline的head節點,觸發write方法。數組
//HeadContext.java public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { unsafe.write(msg, promise); } //AbstractUnsafe public final void write(Object msg, ChannelPromise promise) { ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { safeSetFailure(promise, CLOSED_CHANNEL_EXCEPTION); ReferenceCountUtil.release(msg); return; } int size; try { msg = filterOutboundMessage(msg); size = estimatorHandle().size(msg); if (size < 0) { size = 0; } } catch (Throwable t) { safeSetFailure(promise, t); ReferenceCountUtil.release(msg); return; } outboundBuffer.addMessage(msg, size, promise); }
outboundBuffer 隨着Unsafe一塊兒實例化,最終將msg經過outboundBuffer封裝起來。promise
ChannelOutboundBuffer內部維護了一個Entry鏈表,並使用Entry封裝msg。
一、unflushedEntry:指向鏈表頭部
二、tailEntry:指向鏈表尾部
三、totalPendingSize:保存msg的字節數
四、unwritable:不可寫標識緩存
public void addMessage(Object msg, int size, ChannelPromise promise) { Entry entry = Entry.newInstance(msg, size, total(msg), promise); if (tailEntry == null) { flushedEntry = null; tailEntry = entry; } else { Entry tail = tailEntry; tail.next = entry; tailEntry = entry; } if (unflushedEntry == null) { unflushedEntry = entry; } // increment pending bytes after adding message to the unflushed arrays. // See https://github.com/netty/netty/issues/1619 incrementPendingOutboundBytes(size, false); }
經過Entry.newInstance返回Entry實例,Netty對Entry採用了緩存策略,使用完的Entry實例須要清空並回收,難道是由於Entry實例化比較耗時?app
新的entry默認插入鏈表尾部,並讓tailEntry指向它。socket
imgide
Paste_Image.pngoop
private void incrementPendingOutboundBytes(long size, boolean invokeLater) { if (size == 0) { return; } long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size); if (newWriteBufferSize >= channel.config().getWriteBufferHighWaterMark()) { setUnwritable(invokeLater); } }
方法incrementPendingOutboundBytes主要採用CAS更新totalPendingSize字段,並判斷當前totalPendingSize是否超過閾值writeBufferHighWaterMark,默認是65536。若是totalPendingSize >= 65536,則採用CAS更新unwritable爲1,並觸發ChannelWritabilityChanged事件。
到此爲止,所有的buf數據已經保存在outboundBuffer中。
ctx.flush()實現:
public ChannelHandlerContext flush() { final AbstractChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeFlush(); } else { Runnable task = next.invokeFlushTask; if (task == null) { next.invokeFlushTask = task = new Runnable() { @Override public void run() { next.invokeFlush(); } }; } safeExecute(executor, task, channel().voidPromise(), null); } return this; }
默認狀況下,findContextOutbound()會找到pipeline的head節點,觸發flush方法。
//HeadContext.java public void flush(ChannelHandlerContext ctx) throws Exception { unsafe.flush(); } //AbstractUnsafe public final void flush() { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { return; } outboundBuffer.addFlush(); flush0(); }
方法addFlush主要對write過程添加的msg進行flush標識,其實我不清楚,這個標識過程有什麼意義。
直接看flush0方法:
protected final void flush0() { // Flush immediately only when there's no pending flush. // If there's a pending flush operation, event loop will call forceFlush() later, // and thus there's no need to call it now. if (isFlushPending()) { return; } super.flush0(); } private boolean isFlushPending() { SelectionKey selectionKey = selectionKey(); return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0; }
一、若是當前selectionKey 是寫事件,說明有線程執行flush過程,則直接返回。
二、不然直接執行flush操做。
protected void flush0() { if (inFlush0) { // Avoid re-entrance return; } final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null || outboundBuffer.isEmpty()) { return; } inFlush0 = true; // Mark all pending write requests as failure if the channel is inactive. if (!isActive()) { try { if (isOpen()) { outboundBuffer.failFlushed(NOT_YET_CONNECTED_EXCEPTION, true); } else { // Do not trigger channelWritabilityChanged because the channel is closed already. outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION, false); } } finally { inFlush0 = false; } return; } try { doWrite(outboundBuffer); } catch (Throwable t) { if (t instanceof IOException && config().isAutoClose()) { /** * Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of * failing all flushed messages and also ensure the actual close of the underlying transport * will happen before the promises are notified. * * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()} * may still return {@code true} even if the channel should be closed as result of the exception. */ close(voidPromise(), t, false); } else { outboundBuffer.failFlushed(t, true); } } finally { inFlush0 = false; } } public boolean isActive() { SocketChannel ch = javaChannel(); return ch.isOpen() && ch.isConnected(); }
一、若是當前socketChannel已經關閉,或斷開鏈接,則執行失敗操做。
二、不然執行doWrite把數據寫入到socketChannel。
protected void doWrite(ChannelOutboundBuffer in) throws Exception { for (;;) { int size = in.size(); if (size == 0) { // All written so clear OP_WRITE clearOpWrite(); break; } long writtenBytes = 0; boolean done = false; boolean setOpWrite = false; // Ensure the pending writes are made of ByteBufs only. ByteBuffer[] nioBuffers = in.nioBuffers(); int nioBufferCnt = in.nioBufferCount(); long expectedWrittenBytes = in.nioBufferSize(); SocketChannel ch = javaChannel(); // Always us nioBuffers() to workaround data-corruption. // See https://github.com/netty/netty/issues/2761 switch (nioBufferCnt) { case 0: // We have something else beside ByteBuffers to write so fallback to normal writes. super.doWrite(in); return; case 1: // Only one ByteBuf so use non-gathering write ByteBuffer nioBuffer = nioBuffers[0]; for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { final int localWrittenBytes = ch.write(nioBuffer); if (localWrittenBytes == 0) { setOpWrite = true; break; } expectedWrittenBytes -= localWrittenBytes; writtenBytes += localWrittenBytes; if (expectedWrittenBytes == 0) { done = true; break; } } break; default: for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt); if (localWrittenBytes == 0) { setOpWrite = true; break; } expectedWrittenBytes -= localWrittenBytes; writtenBytes += localWrittenBytes; if (expectedWrittenBytes == 0) { done = true; break; } } break; } // Release the fully written buffers, and update the indexes of the partially written buffer. in.removeBytes(writtenBytes); if (!done) { // Did not write all buffers completely. incompleteWrite(setOpWrite); break; } } }
一、size方法返回outboundBuffer有多少Entry實例。
二、in.nioBuffers()負責把Entry中保存的ByteBuf類型的msg,從新返回Nio的ByteBuffer實例,並返回ByteBuffer數組nioBuffers,其實msg和ByteBuffer實例指向的是同一塊內存,由於在UnpooledDirectByteBuf實現類中,已經維護了ByteBuffer的實例。
三、socketChannel.write()方法把nioBuffers的數據寫到socket中,這是Nio中的實現。
到此爲止,nioBuffers的數據都flush到socket,客戶端能夠準備接收了。