上一章節《Netty 源碼解析系列-服務端啓動流程解析》咱們完成了服務端啓動,那麼服務端啓動完成後,客戶端接入以及讀I/O 事件是怎麼哪裏開始的?以及 netty 的 boss 線程接收到客戶端 TCP 鏈接請求後如何將鏈路註冊到 worker 線程池?帶着這些疑問,咱們開始客戶端鏈接接入及讀寫 I/O 解析。java
processSelectedKeys();
複製代碼
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized(selectedKeys.flip());
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
複製代碼
根據 selectedKeys 是否爲空,判斷是否採用優化後的 selectedKeys ,進到 processSelectedKeysOptimized。redis
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
for (int i = 0;; i ++) {
final SelectionKey k = selectedKeys[i];
if (k == null) {
break;
}
selectedKeys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
...
}
}
複製代碼
k.attachment() 獲取附加的對象,那咱們是在哪裏附加上去的呢?上一篇《Netty 源碼解析-服務端啓動流程解析》註冊時 attach 上去的對象,其實就是 NioServerSocketChannel 自身。promise
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
...
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
...
}
}
複製代碼
咱們再回到 k.attachment() ,在取出附加對象後,判斷類型是否爲 AbstractNioChannel ,從這裏咱們能夠看到,不是附加 AbstractNioChannel 類型,那麼就是附加的 NioTask 對象,在這裏咱們只看關於 AbstractNioChannel 的,進到 processSelectedKey() 方法。緩存
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final NioUnsafe unsafe = ch.unsafe();
...
int readyOps = k.readyOps();
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
return;
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
...
}
複製代碼
當操做類型是讀操做或者鏈接操做,進入 unsafe.read() ,有兩個類實現了這個方法,一個是 AbstractNioByteChannel 的內部類 NioByteUnsafe ,一個是 AbstractNioMessageChannel 的內部類 NioMessageUnsafe ,這兩個類都是 NioUnsafe 實現類 AbstractNioChannel 的子類,那究竟是哪個子類?咱們看看 NioServerSocketChannel 建立時是建立的 NioByteUnsafe 仍是 NioMessageUnsafe。bash
public class NioServerSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel {
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
}
複製代碼
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
複製代碼
public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent, ch, readInterestOp);
}
}
複製代碼
public abstract class AbstractNioChannel extends AbstractChannel {
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
}
}
複製代碼
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
protected AbstractChannel(Channel parent) {
this.parent = parent;
unsafe = newUnsafe();
pipeline = new DefaultChannelPipeline(this);
}
}
複製代碼
NioServerSocketChannel 是 AbstractNioMessageChannel 的子類,AbstractNioMessageChannel 是 AbstractNioChannel 的子類,newUnsafe() 是 AbstractChannel 的抽象方法,那麼咱們從這裏就知道,AbstractNioMessageChannel 實現了 AbstractChannel的newUnsafe() 抽象方法,由此判斷,咱們選擇 AbstractNioMessageChannel 的內部類 NioMessageUnsafe 的 read()。socket
private final class NioMessageUnsafe extends AbstractNioUnsafe {
private final List<Object> readBuf = new ArrayList<Object>();
@Override
public void read() {
...
for (;;) {
int localRead = doReadMessages(readBuf);
...
}
setReadPending(false);
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
pipeline.fireChannelReadComplete();
...
}
複製代碼
這裏分兩部分,一個是處理消息,一個是處理事件。
1.處理消息ide
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = javaChannel().accept();
...
buf.add(new NioSocketChannel(this, ch));
return 1;
...
}
複製代碼
接受了一個客戶端 SocketChannel,封裝到NioSocketChannel,添加到list集合中,咱們看看new NioSocketChannel()。oop
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
config = new NioSocketChannelConfig(this, socket.socket());
}
}
複製代碼
public abstract class AbstractNioByteChannel extends AbstractNioChannel {
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, SelectionKey.OP_READ);
}
@Override
protected AbstractNioUnsafe newUnsafe() {
return new NioByteUnsafe();
}
protected class NioByteUnsafe extends AbstractNioUnsafe {
@Override
public final void read() {
...
}
}
}
複製代碼
AbstractNioByteChannel 也繼承了 AbstractNioChannel ,並實現了 newUnsafe() 方法,由此咱們能夠推斷出當客戶端第一次鏈接時,走的是 AbstractNioMessageChannel 的子類 NioMessageUnsafe的read() ,當客戶端發送數據時,走的是 AbstractNioByteChannel 的內部類 AbstractNioUnsafe 的 read() 方法。
2.處理事件post
for (int i = 0; i < size; i ++) {
pipeline.fireChannelRead(readBuf.get(i));
}
複製代碼
@Override
public ChannelPipeline fireChannelRead(Object msg) {
head.fireChannelRead(msg);
return this;
}
複製代碼
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
final AbstractChannelHandlerContext next = findContextInbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(msg);
} else {
executor.execute(new OneTimeTask() {
@Override
public void run() {
next.invokeChannelRead(msg);
}
});
}
return this;
}
複製代碼
從 next的 debug能夠看出,當前 handler是 ServerBootstrapAcceptor這個處理器來處理 ChannelRead() 方法,若是看了 上一篇《Netty 源碼解析-服務端啓動流程解析》就會知道,這是在 init() 方法中 pipeline.addLast(new ServerBootstrapAcceptor())。爲何不是 p.addLast(new ChannelInitializer())? 由於在 ChannelInitializer.channelRegistered() 會刪除當前 initChannel 處理器。
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
initChannel((C) ctx.channel());
ctx.pipeline().remove(this);
ctx.fireChannelRegistered();
}
複製代碼
咱們繼續看ServerBootstrapAcceptor的ChannelRead() 方法。優化
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
for (Entry<ChannelOption<?>, Object> e: childOptions) {
try {
if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
}
} catch (Throwable t) {
logger.warn("Failed to set a channel option: " + child, t);
}
}
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
複製代碼
這裏分三個步驟
(1) 將childHandler添加處處理器上,這個從哪裏來?就是從最開始設置serverBootstrap.childHandler(new IOChannelInitialize())。
(2) 設置一些參數。
(3) work線程池register客戶端的channel。
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
複製代碼
@Override
public EventLoop next() {
return (EventLoop) super.next();
}
複製代碼
@Override
public EventExecutor next() {
return chooser.next();
}
複製代碼
private final class GenericEventExecutorChooser implements EventExecutorChooser {
@Override
public EventExecutor next() {
return children[Math.abs(childIndex.getAndIncrement() % children.length)];
}
}
複製代碼
從work線程池選一個線程來執行register。
@Override
public ChannelFuture register(Channel channel) {
return register(channel, new DefaultChannelPromise(channel, this));
}
複製代碼
@Override
public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
...
channel.unsafe().register(this, promise);
return promise;
}
複製代碼
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
...
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new OneTimeTask() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
...
}
}
}
複製代碼
@Override
protected void doRegister() throws Exception {
...
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
...
}
複製代碼
後面的流程和上一篇《Netty 源碼解析-服務端啓動流程解析》的註冊流程是同樣的,區別在於服務啓動時註冊是在boss線程池任務隊列中執行註冊,客戶端新接入註冊是在work線程池任務隊列中執行register0() 方法,並將work線程池的selector註冊到Java NIO 到這裏,咱們就能夠回答開篇的的幾個問題:客戶端是如何接入?netty的boss線程接收到客戶端TCP鏈接請求後如何將鏈路註冊到worker線程池? 如今咱們還剩下一個問題:讀寫I/O事件是怎麼哪裏開始的?
咱們回到文章開頭
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
for (int i = 0;; i ++) {
final SelectionKey k = selectedKeys[i];
if (k == null) {
break;
}
selectedKeys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
...
}
}
複製代碼
前面boss線程池在這裏完成了客戶端鏈接接入,並將鏈路註冊到worker線程池任務隊列,添加了read事件的監聽,那麼如今work線程不停循環selectedKeys中有沒有待處理的事件,當有待處理事件,那麼會執行processSelectedKey() 方法。
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
...
int readyOps = k.readyOps();
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
...
}
...
}
複製代碼
在這裏unsafe.read() 選擇AbstractNioByteChannel的read()。
@Override
public final void read() {
final ChannelConfig config = config();
if (!config.isAutoRead() && !isReadPending()) {
// ChannelConfig.setAutoRead(false) was called in the meantime
removeReadOp();
return;
}
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final int maxMessagesPerRead = config.getMaxMessagesPerRead();
RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
if (allocHandle == null) {
this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
}
ByteBuf byteBuf = null;
int messages = 0;
boolean close = false;
try {
int totalReadAmount = 0;
boolean readPendingReset = false;
do {
byteBuf = allocHandle.allocate(allocator);
int writable = byteBuf.writableBytes();
int localReadAmount = doReadBytes(byteBuf);
if (localReadAmount <= 0) {
// not was read release the buffer
byteBuf.release();
byteBuf = null;
close = localReadAmount < 0;
break;
}
if (!readPendingReset) {
readPendingReset = true;
setReadPending(false);
}
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {
totalReadAmount = Integer.MAX_VALUE;
break;
}
totalReadAmount += localReadAmount;
if (!config.isAutoRead()) {
break;
}
if (localReadAmount < writable) {
break;
}
} while (++ messages < maxMessagesPerRead);
pipeline.fireChannelReadComplete();
allocHandle.record(totalReadAmount);
if (close) {
closeOnRead(pipeline);
close = false;
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close);
} finally {
if (!config.isAutoRead() && !isReadPending()) {
removeReadOp();
}
}
}
}
複製代碼
把這一大段代碼分解成幾部分
1.設置循環讀,16次,未讀完則會等到下一輪select 繼續讀取,maxMessagesPerRead默認等於16。
2.獲取緩存操做handler,config.getRecvByteBufAllocator().newHandle()。
3.申請緩存空間,allocHandle.allocate(allocator)。
4.從socket中讀取數據到byteBuf中。
5.傳遞讀事件到下一個handler處理器。
6.讀完以後發送讀完時間到下一個handler處理器 咱們只看讀事件,其餘細節後面的文章再詳細解析。
@Override
public ChannelPipeline fireChannelRead(Object msg) {
head.fireChannelRead(msg);
return this;
}
複製代碼
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
if (msg == null) {
throw new NullPointerException("msg");
}
final AbstractChannelHandlerContext next = findContextInbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(msg);
} else {
executor.execute(new OneTimeTask() {
@Override
public void run() {
next.invokeChannelRead(msg);
}
});
}
return this;
}
複製代碼
Handler事件順序是
HeadContextHandler --> IdleStateHandler -->IOHandler --> TailContext
private void invokeChannelRead(Object msg) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
複製代碼
進到IdleStateHandler
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
reading = true;
firstReaderIdleEvent = firstAllIdleEvent = true;
}
ctx.fireChannelRead(msg);
}
複製代碼
設置讀事件爲true,爲後面狀態檢測作準備,繼續向下傳遞讀事件,此次是IOHandler的讀事件。
public class IOHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
super.channelRead(ctx, msg);
System.out.println(msg.toString());
}
...
}
複製代碼
交給用戶自定義handler處理讀事件,自此讀I/O事件是怎麼哪裏開始,如何交給用戶handler處理已解析完畢。
總結:
1.boss線程處理NioServerSocketChannel的accept事件,並將客戶端添加到work任務隊列,任務隊列執行redister0()方法, 將read事件註冊到work線程的selector。
2.work線程輪詢selectkeys,當有事件上來時,將緩存數據發送到用戶handler 。