Netty 源碼 Channel(二)核心類
[toc]html
Netty 系列目錄(https://www.cnblogs.com/binarylei/p/10117436.html)java
相關文章:網絡
1. Channel 類圖
2. AbstractChannel
2.1 幾個重要屬性
// SocketChannel 的 parent 是 ServerSocketChannel private final Channel parent; // 惟一標識 private final ChannelId id; // Netty 內部使用 private final Unsafe unsafe; // pipeline private final DefaultChannelPipeline pipeline; // 綁定的線程 private volatile EventLoop eventLoop; protected AbstractChannel(Channel parent, ChannelId id) { this.parent = parent; this.id = id; unsafe = newUnsafe(); pipeline = newChannelPipeline(); }
2.2 核心 API
read、write、connect、bind 都委託給了 pipeline 處理。app
3. AbstractNioChannel
3.1 幾個重要屬性
// NIO 底層 Channel private final SelectableChannel ch; // 感興趣的事件 protected final int readInterestOp; // 綁定的 SelectionKey,當 selectionKey 修改後其它線程能夠感知 volatile SelectionKey selectionKey;
3.2 核心 API
(1) doRegisteride
將 channel 註冊到 eventLoop 線程上,此時統一註冊的感興趣的事件類型爲 0。oop
@Override protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { // 1. 將 channel 註冊到 eventLoop 線程上 selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { if (!selected) { // 2. 對註冊失敗的 channel,JDK 將在下次 select 將其刪除 // 然而此時尚未調用 select,固然也能夠調用 selectNow 強刪 eventLoop().selectNow(); selected = true; } else { // 3. JDK API 描述不會有異常,實際上... throw e; } } } }
(2) doBeginReadthis
doBeginRead 只作了一件事就是註冊 channel 感興趣的事件。此至就能夠監聽網絡事件了。url
@Override protected void doBeginRead() throws Exception { // Channel.read() or ChannelHandlerContext.read() was called final SelectionKey selectionKey = this.selectionKey; if (!selectionKey.isValid()) { return; } readPending = true; final int interestOps = selectionKey.interestOps(); if ((interestOps & readInterestOp) == 0) { selectionKey.interestOps(interestOps | readInterestOp); } }
4. AbstractNioByteChannel
AbstractNioByteChannel 中最重要的方法是 doWrite,咱們一塊兒來看一下:spa
@Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { // 1. spin 是自旋的意思,也就是最多循環的次數 int writeSpinCount = config().getWriteSpinCount(); do { // 2. 從 ChannelOutboundBuffer 彈出一條消息 Object msg = in.current(); if (msg == null) { // 3. 寫完了就要清除半包標記 clearOpWrite(); // 4. 直接返回,不調用 incompleteWrite 方法 return; } // 5. 正確處理了一條 msg 消息,循環次數就減 1 writeSpinCount -= doWriteInternal(in, msg); } while (writeSpinCount > 0); // 6. writeSpinCount < 0 認爲有半包須要繼續處理 incompleteWrite(writeSpinCount < 0); }
爲何要設置最大自旋次數,一次把 ChannelOutboundBuffer 中的全部 msg 處理完了不是更好嗎?若是不設置的話,線程會一直嘗試進行網絡 IO 寫操做,此時線程沒法處理其它網絡 IO 事件,可能致使線程假死。.net
下面咱們看一下 msg 消息是如何處理的,這裏以 ByteBuf 消息爲例:
private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception { if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; // 1. 不可讀則丟棄這條消息,繼續處理下一條消息 if (!buf.isReadable()) { in.remove(); return 0; } // 2. 由具體的子類重寫 doWriteBytes 方法,返回處理了多少字節 final int localFlushedAmount = doWriteBytes(buf); if (localFlushedAmount > 0) { // 3. 更新進度 in.progress(localFlushedAmount); if (!buf.isReadable()) { in.remove(); } return 1; } // 文件處理,這裏略過,相似 ByteBuf } else if (msg instanceof FileRegion) { // 省略 ... } else { throw new Error(); } return WRITE_STATUS_SNDBUF_FULL; // WRITE_STATUS_SNDBUF_FULL=Integer.MAX_VALUE }
doWriteBytes 進行消息發送,它是一個抽象方法,由具體的子類實現。若是本次發送的字節數爲 0,說明發送的 TCP 緩衝區已滿,發生了 ZERO_WINDOW。此時再次發送可能還是 0,空循環會佔用 CPU 資源。所以返回 Integer.MAX_VALUE。直接退出循環,設置半包標識,下次繼續處理。
// 沒有寫完,有兩種狀況: // 一是 TCP 緩衝區已滿,doWriteBytes 定入 0 個字節,致使 doWriteInternal 返回 Integer.MAX_VALUE, // 這時設置了半包標識,會自動輪詢寫事件 // 二是自旋的次數已到,將線程交給其它任務執行,未寫完的數據經過 flushTask 繼續寫 protected final void incompleteWrite(boolean setOpWrite) { // Did not write completely. if (setOpWrite) { setOpWrite(); } else { // Schedule flush again later so other tasks can be picked up in the meantime Runnable flushTask = this.flushTask; if (flushTask == null) { flushTask = this.flushTask = new Runnable() { @Override public void run() { flush(); } }; } eventLoop().execute(flushTask); } }
最後咱們來看一下半包是如何處理的,能夠看到所謂的半包標記其實就是是否取 OP_WRITE 事件。
protected final void clearOpWrite() { final SelectionKey key = selectionKey(); final int interestOps = key.interestOps(); if ((interestOps & SelectionKey.OP_WRITE) != 0) { key.interestOps(interestOps & ~SelectionKey.OP_WRITE); } } protected final void setOpWrite() { final SelectionKey key = selectionKey(); final int interestOps = key.interestOps(); if ((interestOps & SelectionKey.OP_WRITE) == 0) { key.interestOps(interestOps | SelectionKey.OP_WRITE); } }
5. AbstractNioMessageChannel
AbstractNioMessageChannel#doWrite 方法和 AbstractNioByteChannel#doWrite 相似,前者能夠寫 POJO 對象,後者只能寫 ByteBuf 和 FileRegion。
6. NioServerSocketChannel
NioServerSocketChannel 經過 doReadMessages 接收客戶端的鏈接請求:
@Override protected int doReadMessages(List<Object> buf) throws Exception { SocketChannel ch = SocketUtils.accept(javaChannel()); if (ch != null) { buf.add(new NioSocketChannel(this, ch)); return 1; } return 0; }
7. NioSocketChannel
(1) doConnect
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { if (localAddress != null) { doBind0(localAddress); } boolean success = false; try { boolean connected = SocketUtils.connect(javaChannel(), remoteAddress); if (!connected) { selectionKey().interestOps(SelectionKey.OP_CONNECT); } success = true; return connected; } finally { if (!success) { doClose(); } } }
鏈接時有三種狀況:
- 直接就鏈接成功,返回 true
- 若是沒有鏈接成功,就註冊 OP_CONNECT 事件進行監聽,返回 false
- 發生異常
(2) doWriteBytes
向 ServerSocket 中寫入數據。
@Override protected int doWriteBytes(ByteBuf buf) throws Exception { final int expectedWrittenBytes = buf.readableBytes(); return buf.readBytes(javaChannel(), expectedWrittenBytes); }
(3) doReadBytes
從 ServerSocket 中讀取數據。
@Override protected int doReadBytes(ByteBuf byteBuf) throws Exception { final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.attemptedBytesRead(byteBuf.writableBytes()); return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead()); }
最底層仍是調用 Channel 的 read 方法。
// AbstractByteBuf#writeBytes public int writeBytes(ScatteringByteChannel in, int length) throws IOException { ensureWritable(length); int writtenBytes = setBytes(writerIndex, in, length); if (writtenBytes > 0) { writerIndex += writtenBytes; } return writtenBytes; } // UnpooledHeapByteBuf#setBytes public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException { ensureAccessible(); try { return in.read((ByteBuffer) internalNioBuffer().clear().position(index).limit(index + length)); } catch (ClosedChannelException ignored) { return -1; } }
天天用心記錄一點點。內容也許不重要,但習慣很重要!