Netty源碼分析 (八)----- write過程 源碼分析

上一篇文章主要講了netty的read過程,本文主要分析一下write和writeAndFlush。html

主要內容

本文分如下幾個部分闡述一個java對象最後是如何轉變成字節流,寫到socket緩衝區中去的java

  1. pipeline中的標準鏈表結構
  2. java對象編碼過程
  3. write:寫隊列
  4. flush:刷新寫隊列
  5. writeAndFlush: 寫隊列並刷新

pipeline中的標準鏈表結構

一個標準的pipeline鏈式結構以下編程

數據從head節點流入,先拆包,而後解碼成業務對象,最後通過業務Handler處理,調用write,將結果對象寫出去。而寫的過程先經過tail節點,而後經過encoder節點將對象編碼成ByteBuf,最後將該ByteBuf對象傳遞到head節點,調用底層的Unsafe寫到jdk底層管道

java對象編碼過程

爲何咱們在pipeline中添加了encoder節點,java對象就轉換成netty能夠處理的ByteBuf,寫到管道里?promise

咱們先看下調用write的code緩存

BusinessHandler數據結構

protected void channelRead0(ChannelHandlerContext ctx, Request request) throws Exception { Response response = doBusiness(request); if (response != null) {  ctx.channel().write(response); } }

業務處理器接受到請求以後,作一些業務處理,返回一個Response,而後,response在pipeline中傳遞,落到 Encoder節點,咱們來跟蹤一下 ctx.channel().write(response);併發

public ChannelFuture write(Object msg) { return this.pipeline.write(msg); }

調用了Channel中的pipeline中的write方法,咱們接着看socket

public final ChannelFuture write(Object msg) { return this.tail.write(msg); }

pipeline中有屬性tail,調用tail中的write,由此咱們知道write消息的時候,從tail開始,接着往下看ide

private void write(Object msg, boolean flush, ChannelPromise promise) { AbstractChannelHandlerContext next = this.findContextOutbound(); Object m = this.pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { next.invokeWriteAndFlush(m, promise); } else {  next.invokeWrite(m, promise); } } else { Object task; if (flush) { task = AbstractChannelHandlerContext.WriteAndFlushTask.newInstance(next, m, promise); } else { task = AbstractChannelHandlerContext.WriteTask.newInstance(next, m, promise); } safeExecute(executor, (Runnable)task, promise, m); } }

中間我省略了幾個重載的方法,咱們來看看第一行代碼,next = this.findContextOutbound();oop

private AbstractChannelHandlerContext findContextOutbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.prev; } while(!ctx.outbound); return ctx; }

經過 ctx = ctx.prev; 咱們知道從tail開始找到pipeline中的第一個outbound的handler,而後調用 invokeWrite(m, promise),此時找到的第一個outbound的handler就是咱們自定義的編碼器Encoder

咱們接着看 next.invokeWrite(m, promise);

private void invokeWrite(Object msg, ChannelPromise promise) { if (this.invokeHandler()) { this.invokeWrite0(msg, promise); } else { this.write(msg, promise); } } private void invokeWrite0(Object msg, ChannelPromise promise) { try { ((ChannelOutboundHandler)this.handler()).write(this, msg, promise); } catch (Throwable var4) { notifyOutboundHandlerException(var4, promise); } }

一路代碼跟下來,咱們能夠知道是調用了第一個outBound類型的handler中的write方法,也就是第一個調用的是咱們自定義編碼器Encoder的write方法

咱們來看看自定義Encoder

public class Encoder extends MessageToByteEncoder<Response> { @Override protected void encode(ChannelHandlerContext ctx, Response response, ByteBuf out) throws Exception { out.writeByte(response.getVersion()); out.writeInt(4 + response.getData().length); out.writeBytes(response.getData()); } }

自定義Encoder繼承 MessageToByteEncoder ,而且重寫了 encode方法,這就是編碼器的核心,咱們先來看 MessageToByteEncoder

public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter {

咱們看到 MessageToByteEncoder 繼承了 ChannelOutboundHandlerAdapter,說明了 Encoder 是一個 Outbound的handler

咱們來看看 Encoder 的父類 MessageToByteEncoder中的write方法

MessageToByteEncoder

@Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ByteBuf buf = null; try { // 判斷當前Handelr是否能處理寫入的消息
        if (acceptOutboundMessage(msg)) { @SuppressWarnings("unchecked") // 強制換換
            I cast = (I) msg; // 分配一段ButeBuf
           buf = allocateBuffer(ctx, cast, preferDirect); try { // 調用encode,這裏就調回到 `Encoder` 這個Handelr中 
 encode(ctx, cast, buf); } finally { // 既然自定義java對象轉換成ByteBuf了,那麼這個對象就已經無用了,釋放掉 // (當傳入的msg類型是ByteBuf的時候,就不須要本身手動釋放了)
 ReferenceCountUtil.release(cast); } // 若是buf中寫入了數據,就把buf傳到下一個節點
            if (buf.isReadable()) { ctx.write(buf, promise); } else { // 不然,釋放buf,將空數據傳到下一個節點   buf.release(); ctx.write(Unpooled.EMPTY_BUFFER, promise); } buf = null; } else { // 若是當前節點不能處理傳入的對象,直接扔給下一個節點處理
 ctx.write(msg, promise); } } catch (EncoderException e) { throw e; } catch (Throwable e) { throw new EncoderException(e); } finally { // 當buf在pipeline中處理完以後,釋放
        if (buf != null) { buf.release(); } } }

這裏,咱們詳細闡述一下Encoder是如何處理傳入的java對象的

1.判斷當前Handler是否能處理寫入的消息,若是能處理,進入下面的流程,不然,直接扔給下一個節點處理
2.將對象強制轉換成Encoder能夠處理的 Response對象
3.分配一個ByteBuf
4.調用encoder,即進入到 Encoderencode方法,該方法是用戶代碼,用戶將數據寫入ByteBuf
5.既然自定義java對象轉換成ByteBuf了,那麼這個對象就已經無用了,釋放掉,(當傳入的msg類型是ByteBuf的時候,就不須要本身手動釋放了)
6.若是buf中寫入了數據,就把buf傳到下一個節點,不然,釋放buf,將空數據傳到下一個節點
7.最後,當buf在pipeline中處理完以後,釋放節點

總結一點就是,Encoder節點分配一個ByteBuf,調用encode方法,將java對象根據自定義協議寫入到ByteBuf,而後再把ByteBuf傳入到下一個節點,在咱們的例子中,最終會傳入到head節點,由於head節點是一個OutBount類型的handler

HeadContext

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { unsafe.write(msg, promise); }

這裏的msg就是前面在Encoder節點中,載有java對象數據的自定義ByteBuf對象,進入下一節

write:寫隊列

咱們來看看channel中unsafe的write方法,先來看看其中的一個屬性

AbstractUnsafe

protected abstract class AbstractUnsafe implements Unsafe { private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);

咱們來看看 ChannelOutboundBuffer 這個類

public final class ChannelOutboundBuffer { private final Channel channel; private ChannelOutboundBuffer.Entry flushedEntry; private ChannelOutboundBuffer.Entry unflushedEntry; private ChannelOutboundBuffer.Entry tailEntry;

ChannelOutboundBuffer內部維護了一個Entry鏈表,並使用Entry封裝msg。其中的屬性咱們下面會詳細講

咱們回到正題,接着看 unsafe.write(msg, promise);

AbstractUnsafe

@Override public final void write(Object msg, ChannelPromise promise) { assertEventLoop();  ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; int size; try {  msg = filterOutboundMessage(msg); size = pipeline.estimatorHandle().size(msg); if (size < 0) { size = 0; } } catch (Throwable t) { safeSetFailure(promise, t); ReferenceCountUtil.release(msg); return; }  outboundBuffer.addMessage(msg, size, promise); }

1.調用 filterOutboundMessage() 方法,將待寫入的對象過濾,把非ByteBuf對象和FileRegion過濾,把全部的非直接內存轉換成直接內存DirectBuffer

@Override protected final Object filterOutboundMessage(Object msg) { if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; if (buf.isDirect()) { return msg; } return newDirectBuffer(buf); } if (msg instanceof FileRegion) { return msg; } throw new UnsupportedOperationException( "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES); }

2.接下來,估算出須要寫入的ByteBuf的size
3.最後,調用 ChannelOutboundBuffer 的addMessage(msg, size, promise) 方法,因此,接下來,咱們須要重點看一下這個方法幹了什麼事情

ChannelOutboundBuffer

public void addMessage(Object msg, int size, ChannelPromise promise) { // 建立一個待寫出的消息節點
    Entry entry = Entry.newInstance(msg, size, total(msg), promise); if (tailEntry == null) { flushedEntry = null; tailEntry = entry; } else { Entry tail = tailEntry; tail.next = entry; tailEntry = entry; } if (unflushedEntry == null) { unflushedEntry = entry; } incrementPendingOutboundBytes(size, false); }

想要理解上面這段代碼,必須得掌握寫緩存中的幾個消息指針,以下圖

ChannelOutboundBuffer 裏面的數據結構是一個單鏈表結構,每一個節點是一個 EntryEntry 裏面包含了待寫出ByteBuf 以及消息回調 promise,下面分別是三個指針的做用

1.flushedEntry 指針表示第一個被寫到操做系統Socket緩衝區中的節點
2.unFlushedEntry 指針表示第一個未被寫入到操做系統Socket緩衝區中的節點
3.tailEntry指針表示ChannelOutboundBuffer緩衝區的最後一個節點

初次調用 addMessage 以後,各個指針的狀況爲

fushedEntry指向空,unFushedEntry和 tailEntry 都指向新加入的節點

第二次調用 addMessage以後,各個指針的狀況爲

第n次調用 addMessage以後,各個指針的狀況爲

能夠看到,調用n次addMessage,flushedEntry指針一直指向NULL,表示如今還未有節點須要寫出到Socket緩衝區,而unFushedEntry以後有n個節點,表示當前還有n個節點還沒有寫出到Socket緩衝區中去

flush:刷新寫隊列

無論調用channel.flush(),仍是ctx.flush(),最終都會落地到pipeline中的head節點

HeadContext

@Override public void flush(ChannelHandlerContext ctx) throws Exception { unsafe.flush(); }

以後進入到AbstractUnsafe

AbstractUnsafe

public final void flush() { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { return; }  outboundBuffer.addFlush();  flush0(); }

flush方法中,先調用 outboundBuffer.addFlush();

ChannelOutboundBuffer

public void addFlush() { Entry entry = unflushedEntry; if (entry != null) { if (flushedEntry == null) { flushedEntry = entry; } do { flushed ++; if (!entry.promise.setUncancellable()) { int pending = entry.cancel(); decrementPendingOutboundBytes(pending, false, true); } entry = entry.next; } while (entry != null); unflushedEntry = null; } }

能夠結合前面的圖來看,首先拿到 unflushedEntry 指針,而後將 flushedEntry 指向unflushedEntry所指向的節點,調用完畢以後,三個指針的狀況以下所示

 

至關於全部的節點都即將開始推送出去

接下來,調用 flush0();

AbstractUnsafe

protected void flush0() { doWrite(outboundBuffer); }

發現這裏的核心代碼就一個 doWrite,繼續跟

AbstractNioByteChannel

protected void doWrite(ChannelOutboundBuffer in) throws Exception { int writeSpinCount = -1; boolean setOpWrite = false; for (;;) { // 拿到第一個須要flush的節點的數據
       Object msg = in.current(); if (msg instanceof ByteBuf) { // 強轉爲ByteBuf,若發現沒有數據可讀,直接刪除該節點
            ByteBuf buf = (ByteBuf) msg; boolean done = false; long flushedAmount = 0; // 拿到自旋鎖迭代次數
            if (writeSpinCount == -1) { writeSpinCount = config().getWriteSpinCount(); } // 自旋,將當前節點寫出
            for (int i = writeSpinCount - 1; i >= 0; i --) { int localFlushedAmount = doWriteBytes(buf); if (localFlushedAmount == 0) { setOpWrite = true; break; } flushedAmount += localFlushedAmount; if (!buf.isReadable()) { done = true; break; } } in.progress(flushedAmount); // 寫完以後,將當前節點刪除
            if (done) { in.remove(); } else { break; } } } }

這裏略微有點複雜,咱們分析一下

1.第一步,調用current()先拿到第一個須要flush的節點的數據

 ChannelOutBoundBuffer

public Object current() { Entry entry = flushedEntry; if (entry == null) { return null; } return entry.msg; }

2.第二步,拿到自旋鎖的迭代次數

if (writeSpinCount == -1) { writeSpinCount = config().getWriteSpinCount(); }

3.自旋的方式將ByteBuf寫出到jdk nio的Channel

for (int i = writeSpinCount - 1; i >= 0; i --) { int localFlushedAmount = doWriteBytes(buf); if (localFlushedAmount == 0) { setOpWrite = true; break; } flushedAmount += localFlushedAmount; if (!buf.isReadable()) { done = true; break; } }

doWriteBytes 方法跟進去

protected int doWriteBytes(ByteBuf buf) throws Exception { final int expectedWrittenBytes = buf.readableBytes(); return buf.readBytes(javaChannel(), expectedWrittenBytes); }

咱們發現,出現了 javaChannel(),代表已經進入到了jdk nio Channel的領域,咱們來看看 buf.readBytes(javaChannel(), expectedWrittenBytes);

public int readBytes(GatheringByteChannel out, int length) throws IOException { this.checkReadableBytes(length); int readBytes = this.getBytes(this.readerIndex, out, length); this.readerIndex += readBytes; return readBytes; }

咱們來看關鍵代碼 this.getBytes(this.readerIndex, out, length)

private int getBytes(int index, GatheringByteChannel out, int length, boolean internal) throws IOException { this.checkIndex(index, length); if (length == 0) { return 0; } else { ByteBuffer tmpBuf; if (internal) { tmpBuf = this.internalNioBuffer(); } else { tmpBuf = ((ByteBuffer)this.memory).duplicate(); } index = this.idx(index); tmpBuf.clear().position(index).limit(index + length); //將tmpBuf中的數據寫到out中
        return out.write(tmpBuf); } }

咱們來看看out.write(tmpBuf)

public int write(ByteBuffer src) throws IOException { ensureOpen(); if (!writable) throw new NonWritableChannelException(); synchronized (positionLock) { int n = 0; int ti = -1; try { begin(); ti = threads.add(); if (!isOpen()) return 0; do { n = IOUtil.write(fd, src, -1, nd); } while ((n == IOStatus.INTERRUPTED) && isOpen()); return IOStatus.normalize(n); } finally { threads.remove(ti); end(n > 0); assert IOStatus.check(n); } } }

和read實現同樣,SocketChannelImpl的write方法經過IOUtil的write實現:關鍵代碼 n = IOUtil.write(fd, src, -1, nd);

static int write(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException { //若是是DirectBuffer,直接寫,將堆外緩存中的數據拷貝到內核緩存中進行發送
    if (var1 instanceof DirectBuffer) { return writeFromNativeBuffer(var0, var1, var2, var4); } else { //非DirectBuffer //獲取已經讀取到的位置
        int var5 = var1.position(); //獲取能夠讀到的位置
        int var6 = var1.limit(); assert var5 <= var6; //申請一個原buffer可讀大小的DirectByteBuffer
        int var7 = var5 <= var6 ? var6 - var5 : 0; ByteBuffer var8 = Util.getTemporaryDirectBuffer(var7); int var10; try { var8.put(var1); var8.flip(); var1.position(var5); //經過DirectBuffer寫,將堆外緩存的數據拷貝到內核緩存中進行發送
            int var9 = writeFromNativeBuffer(var0, var8, var2, var4); if (var9 > 0) { var1.position(var5 + var9); } var10 = var9; } finally { //回收分配的DirectByteBuffer
 Util.offerFirstTemporaryDirectBuffer(var8); } return var10; } }

代碼邏輯咱們就再也不講了,代碼註釋已經很清楚了,這裏咱們關注一點,咱們能夠看看咱們前面的一個方法 filterOutboundMessage(),將待寫入的對象過濾,把非ByteBuf對象和FileRegion過濾,把全部的非直接內存轉換成直接內存DirectBuffer

說明到了這一步全部的 var1 意境是直接內存DirectBuffer,就不須要走到else,就不須要write兩次了

4.刪除該節點

節點的數據已經寫入完畢,接下來就須要刪除該節點

ChannelOutBoundBuffer

public boolean remove() {  Entry e = flushedEntry; Object msg = e.msg; ChannelPromise promise = e.promise; int size = e.pendingSize; removeEntry(e); if (!e.cancelled) { ReferenceCountUtil.safeRelease(msg); safeSuccess(promise); } // recycle the entry
 e.recycle(); return true; }

首先拿到當前被flush掉的節點(flushedEntry所指),而後拿到該節點的回調對象 ChannelPromise, 調用 removeEntry()方法移除該節點

private void removeEntry(Entry e) { if (-- flushed == 0) { flushedEntry = null; if (e == tailEntry) { tailEntry = null; unflushedEntry = null; } } else {  flushedEntry = e.next; } }

這裏的remove是邏輯移除,只是將flushedEntry指針移到下個節點,調用完畢以後,節點圖示以下

writeAndFlush: 寫隊列並刷新

理解了write和flush這兩個過程,writeAndFlush 也就不難了

public final ChannelFuture writeAndFlush(Object msg) { return tail.writeAndFlush(msg); } public ChannelFuture writeAndFlush(Object msg) { return writeAndFlush(msg, newPromise()); } public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { write(msg, true, promise); return promise; } private void write(Object msg, boolean flush, ChannelPromise promise) { AbstractChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { next.invokeWriteAndFlush(m, promise); } else { next.invokeWrite(m, promise); } } }

能夠看到,最終,經過一個boolean變量,表示是調用 invokeWriteAndFlush,仍是 invokeWriteinvokeWrite即是咱們上文中的write過程

private void invokeWriteAndFlush(Object msg, ChannelPromise promise) { invokeWrite0(msg, promise); invokeFlush0(); }

能夠看到,最終調用的底層方法和單獨調用 write 和 flush 是同樣的

private void invokeWrite(Object msg, ChannelPromise promise) { invokeWrite0(msg, promise); } private void invokeFlush(Object msg, ChannelPromise promise) { invokeFlush0(msg, promise); }

由此看來,invokeWriteAndFlush基本等價於write方法以後再來一次flush

總結

1.pipeline中的編碼器原理是建立一個ByteBuf,將java對象轉換爲ByteBuf,而後再把ByteBuf繼續向前傳遞
2.調用write方法並無將數據寫到Socket緩衝區中,而是寫到了一個單向鏈表的數據結構中,flush纔是真正的寫出
3.writeAndFlush等價於先將數據寫到netty的緩衝區,再將netty緩衝區中的數據寫到Socket緩衝區中,寫的過程與併發編程相似,用自旋鎖保證寫成功
4.netty中的緩衝區中的ByteBuf爲DirectByteBuf

 

 

 

原文出處:https://www.cnblogs.com/java-chen-hao/p/11477385.html

相關文章
相關標籤/搜索