認真的 Netty 源碼解析(二)

Channel 的 register 操做

通過前面的鋪墊,咱們已經具有必定的基礎了,咱們開始來把前面學到的內容揉在一塊兒。這節,咱們會介紹 register 操做,這一步實際上是很是關鍵的,對於咱們源碼分析很是重要。java

register

咱們從 EchoClient 中的 connect() 方法出發,或者 EchoServer 的 bind(port) 方法出發,都會走到 initAndRegister() 這個方法:redis

final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 1
channel = channelFactory.newChannel();
// 2 對於 Bootstrap 和 ServerBootstrap,這裏面有些不同
init(channel);
} catch (Throwable t) {
...
}
// 3 咱們這裏要說的是這行
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 1
channel = channelFactory.newChannel();
// 2 對於 Bootstrap 和 ServerBootstrap,這裏面有些不同
init(channel);
} catch (Throwable t) {
...
}
// 3 咱們這裏要說的是這行
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}

initAndRegister() 這個方法咱們已經接觸過兩次了,前面介紹了 1️⃣ Channel 的實例化,實例化過程當中,會執行 Channel 內部 Unsafe 和 Pipeline 的實例化,以及在上面 2️⃣ init(channel) 方法中,會往 pipeline 中添加 handler(pipeline 此時是 head+channelnitializer+tail)。promise

咱們這節終於要揭祕 ChannelInitializer 中的 initChannel 方法了~緩存

如今,咱們繼續往下走,看看 3️⃣ register 這一步:mybatis

ChannelFuture regFuture = config().group().register(channel);ChannelFuture regFuture = config().group().register(channel);

咱們說了,register 這一步是很是關鍵的,它發生在 channel 實例化之後,你們回憶一下當前 channel 中的一些狀況:app

實例化了 JDK 底層的 Channel,設置了非阻塞,實例化了 Unsafe,實例化了 Pipeline,同時往 pipeline 中添加了 head、tail 以及一個 ChannelInitializer 實例。異步

上面的 config().group() 方法會返回前面實例化的 NioEventLoopGroup 的實例,而後調用其 register(channel) 方法:ide

// MultithreadEventLoopGroupoop

@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}

next() 方法很簡單,就是選擇線程池中的一個線程(還記得 chooserFactory 嗎),也就是選擇一個 NioEventLoop 實例,這個時候咱們就進入到 NioEventLoop 了。源碼分析

NioEventLoop 的 register(channel) 方法實如今它的父類 SingleThreadEventLoop 中:

@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}

上面的代碼實例化了一個 Promise,將當前 channel 帶了進去:

@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
// promise 關聯了 channel,channel 持有 Unsafe 實例,register 操做就封裝在 Unsafe 中
promise.channel().unsafe().register(this, promise);
return promise;
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
// promise 關聯了 channel,channel 持有 Unsafe 實例,register 操做就封裝在 Unsafe 中
promise.channel().unsafe().register(this, promise);
return promise;
}

拿到 channel 中關聯的 Unsafe 實例,而後調用它的 register 方法:

咱們說過,Unsafe 專門用來封裝底層實現,固然這裏也沒那麼「底層」

// AbstractChannel#AbstractUnsafe

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
...
// 將這個 eventLoop 實例設置給這個 channel,今後這個 channel 就是有 eventLoop 的了
// 我以爲這一步其實挺關鍵的,由於後續該 channel 中的全部異步操做,都要提交給這個 eventLoop 來執行
AbstractChannel.this.eventLoop = eventLoop;

// 若是發起 register 動做的線程就是 eventLoop 實例中的線程,那麼直接調用 register0(promise)
// 對於咱們來講,它不會進入到這個分支,
// 之因此有這個分支,是由於咱們是能夠 unregister,而後再 register 的,後面再仔細看
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
// 不然,提交任務給 eventLoop,eventLoop 中的線程會負責調用 register0(promise)
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
...
}
}
}
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
...
// 將這個 eventLoop 實例設置給這個 channel,今後這個 channel 就是有 eventLoop 的了
// 我以爲這一步其實挺關鍵的,由於後續該 channel 中的全部異步操做,都要提交給這個 eventLoop 來執行
AbstractChannel.this.eventLoop = eventLoop;

// 若是發起 register 動做的線程就是 eventLoop 實例中的線程,那麼直接調用 register0(promise)
// 對於咱們來講,它不會進入到這個分支,
// 之因此有這個分支,是由於咱們是能夠 unregister,而後再 register 的,後面再仔細看
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
// 不然,提交任務給 eventLoop,eventLoop 中的線程會負責調用 register0(promise)
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
...
}
}
}

到這裏,咱們要明白,NioEventLoop 中是尚未實例化 Thread 實例的。

這幾步涉及到了好幾個類:NioEventLoop、Promise、Channel、Unsafe 等,你們要仔細理清楚它們的關係。

對於咱們前面過來的 register 操做,其實提交到 eventLoop 之後,就直接返回 promise 實例了,剩下的register0 是異步操做,它由 NioEventLoop 實例來完成。

咱們這邊先不繼續往裏分析 register0(promise) 方法,先把前面欠下的 NioEventLoop 中的線程介紹清楚,而後再回來介紹這個 register0 方法。

Channel 實例一旦 register 到了 NioEventLoopGroup 實例中的某個 NioEventLoop 實例,那麼後續該 Channel 的全部操做,都是由該 NioEventLoop 實例來完成的。

這個也很是簡單,由於 Selector 實例是在 NioEventLoop 實例中的,Channel 實例一旦註冊到某個 Selector 實例中,固然也只能在這個實例中處理 NIO 事件。

NioEventLoop 工做流程

前面,咱們在分析線程池的實例化的時候說過,NioEventLoop 中並無啓動 Java 線程。這裏咱們來仔細分析下在 register 過程當中調用的 eventLoop.execute(runnable) 這個方法,這個代碼在父類 SingleThreadEventExecutor 中:

@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
// 判斷添加任務的線程是否就是當前 EventLoop 中的線程
boolean inEventLoop = inEventLoop();

// 添加任務到以前介紹的 taskQueue 中,
// 若是 taskQueue 滿了(默認大小 16),根據咱們以前說的,默認的策略是拋出異常
addTask(task);

if (!inEventLoop) {
// 若是不是 NioEventLoop 內部線程提交的 task,那麼判斷下線程是否已經啓動,沒有的話,就啓動線程
startThread();
if (isShutdown() && removeTask(task)) {
reject();
}
}

if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
// 判斷添加任務的線程是否就是當前 EventLoop 中的線程
boolean inEventLoop = inEventLoop();

// 添加任務到以前介紹的 taskQueue 中,
// 若是 taskQueue 滿了(默認大小 16),根據咱們以前說的,默認的策略是拋出異常
addTask(task);

if (!inEventLoop) {
// 若是不是 NioEventLoop 內部線程提交的 task,那麼判斷下線程是否已經啓動,沒有的話,就啓動線程
startThread();
if (isShutdown() && removeTask(task)) {
reject();
}
}

if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}

原來啓動 NioEventLoop 中的線程的方法在這裏。

另外,上節咱們說的 register 操做進到了 taskQueue 中,因此它實際上是被歸類到了非 IO 操做的範疇。

下面是 startThread 的源碼,判斷線程是否已經啓動來決定是否要進行啓動操做:

private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
try {
doStartThread();
} catch (Throwable cause) {
STATE_UPDATER.set(this, ST_NOT_STARTED);
PlatformDependent.throwException(cause);
}
}
}
}
private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
try {
doStartThread();
} catch (Throwable cause) {
STATE_UPDATER.set(this, ST_NOT_STARTED);
PlatformDependent.throwException(cause);
}
}
}
}

咱們按照前面的思路,根據線程沒有啓動的狀況,來看看 doStartThread() 方法:

private void doStartThread() {
assert thread == null;
// 這裏的 executor 你們是否是有點熟悉的感受,它就是一開始咱們實例化 NioEventLoop 的時候傳進來的 ThreadPerTaskExecutor 的實例。它是每次來一個任務,建立一個線程的那種 executor。
// 一旦咱們調用它的 execute 方法,它就會建立一個新的線程,因此這裏終於會建立 Thread 實例
executor.execute(new Runnable() {
@Override
public void run() {
// 看這裏,將 「executor」 中建立的這個線程設置爲 NioEventLoop 的線程!!!
thread = Thread.currentThread();

if (interrupted) {
thread.interrupt();
}

boolean success = false;
updateLastExecutionTime();
try {
// 執行 SingleThreadEventExecutor 的 run() 方法,它在 NioEventLoop 中實現了
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
// ... 咱們直接忽略掉這裏的代碼
}
}
});
}
private void doStartThread() {
assert thread == null;
// 這裏的 executor 你們是否是有點熟悉的感受,它就是一開始咱們實例化 NioEventLoop 的時候傳進來的 ThreadPerTaskExecutor 的實例。它是每次來一個任務,建立一個線程的那種 executor。
// 一旦咱們調用它的 execute 方法,它就會建立一個新的線程,因此這裏終於會建立 Thread 實例
executor.execute(new Runnable() {
@Override
public void run() {
// 看這裏,將 「executor」 中建立的這個線程設置爲 NioEventLoop 的線程!!!
thread = Thread.currentThread();

if (interrupted) {
thread.interrupt();
}

boolean success = false;
updateLastExecutionTime();
try {
// 執行 SingleThreadEventExecutor 的 run() 方法,它在 NioEventLoop 中實現了
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
// ... 咱們直接忽略掉這裏的代碼
}
}
});
}

上面線程啓動之後,會執行 NioEventLoop 中的 run() 方法,這是一個很是重要的方法,這個方法確定是沒那麼容易結束的,必然是像 JDK 線程池的 Worker 那樣,不斷地循環獲取新的任務的。它須要不斷地作 select 操做和輪詢 taskQueue 這個隊列。

咱們先來簡單地看一下它的源碼,這裏先不作深刻地介紹:

@Override
protected void run() {
// 代碼嵌套在 for 循環中
for (;;) {
try {
// selectStrategy 終於要派上用場了
// 它有兩個值,一個是 CONTINUE 一個是 SELECT
// 針對這塊代碼,咱們分析一下。
// 1. 若是 taskQueue 不爲空,也就是 hasTasks() 返回 true,
// 那麼執行一次 selectNow(),該方法不會阻塞
// 2. 若是 hasTasks() 返回 false,那麼執行 SelectStrategy.SELECT 分支,
// 進行 select(...),這塊是帶阻塞的
// 這個很好理解,就是按照是否有任務在排隊來決定是否能夠進行阻塞
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
// 若是 !hasTasks(),那麼進到這個 select 分支,這裏 select 帶阻塞的
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
}


cancelledKeys = 0;
needsToSelectAgain = false;
// 默認地,ioRatio 的值是 50
final int ioRatio = this.ioRatio;

if (ioRatio == 100) {
// 若是 ioRatio 設置爲 100,那麼先執行 IO 操做,而後在 finally 塊中執行 taskQueue 中的任務
try {
// 1. 執行 IO 操做。由於前面 select 之後,可能有些 channel 是須要處理的。
processSelectedKeys();
} finally {
// 2. 執行非 IO 任務,也就是 taskQueue 中的任務
runAllTasks();
}
} else {
// 若是 ioRatio 不是 100,那麼根據 IO 操做耗時,限制非 IO 操做耗時
final long ioStartTime = System.nanoTime();
try {
// 執行 IO 操做
processSelectedKeys();
} finally {
// 根據 IO 操做消耗的時間,計算執行非 IO 操做(runAllTasks)能夠用多少時間.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
@Override
protected void run() {
// 代碼嵌套在 for 循環中
for (;;) {
try {
// selectStrategy 終於要派上用場了
// 它有兩個值,一個是 CONTINUE 一個是 SELECT
// 針對這塊代碼,咱們分析一下。
// 1. 若是 taskQueue 不爲空,也就是 hasTasks() 返回 true,
// 那麼執行一次 selectNow(),該方法不會阻塞
// 2. 若是 hasTasks() 返回 false,那麼執行 SelectStrategy.SELECT 分支,
// 進行 select(...),這塊是帶阻塞的
// 這個很好理解,就是按照是否有任務在排隊來決定是否能夠進行阻塞
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
// 若是 !hasTasks(),那麼進到這個 select 分支,這裏 select 帶阻塞的
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
}


cancelledKeys = 0;
needsToSelectAgain = false;
// 默認地,ioRatio 的值是 50
final int ioRatio = this.ioRatio;

if (ioRatio == 100) {
// 若是 ioRatio 設置爲 100,那麼先執行 IO 操做,而後在 finally 塊中執行 taskQueue 中的任務
try {
// 1. 執行 IO 操做。由於前面 select 之後,可能有些 channel 是須要處理的。
processSelectedKeys();
} finally {
// 2. 執行非 IO 任務,也就是 taskQueue 中的任務
runAllTasks();
}
} else {
// 若是 ioRatio 不是 100,那麼根據 IO 操做耗時,限制非 IO 操做耗時
final long ioStartTime = System.nanoTime();
try {
// 執行 IO 操做
processSelectedKeys();
} finally {
// 根據 IO 操做消耗的時間,計算執行非 IO 操做(runAllTasks)能夠用多少時間.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}

上面這段代碼是 NioEventLoop 的核心,這裏介紹兩點:

  1. 首先,會根據 hasTasks() 的結果來決定是執行 selectNow() 仍是 select(oldWakenUp),這個應該好理解。若是有任務正在等待,那麼應該使用無阻塞的 selectNow(),若是沒有任務在等待,那麼就可使用帶阻塞的 select 操做。

  2. ioRatio 控制 IO 操做所佔的時間比重:

  • 若是設置爲 100%,那麼先執行 IO 操做,而後再執行任務隊列中的任務。

  • 若是不是 100%,那麼先執行 IO 操做,而後執行 taskQueue 中的任務,可是須要控制執行任務的總時間。也就是說,非 IO 操做能夠佔用的時間,經過 ioRatio 以及此次 IO 操做耗時計算得出。

咱們這裏先不要去關心 select(oldWakenUp)、processSelectedKeys() 方法和 runAllTasks(…) 方法的細節,只要先理解它們分別作什麼事情就能夠了。

回過神來,咱們前面在 register 的時候提交了 register 任務給 NioEventLoop,這是 NioEventLoop 接收到的第一個任務,因此這裏會實例化 Thread 而且啓動,而後進入到 NioEventLoop 中的 run 方法。

繼續 register

咱們回到前面的 register0(promise) 方法,咱們知道,這個 register 任務進入到了 NioEventLoop 的 taskQueue 中,而後會啓動 NioEventLoop 中的線程,該線程會輪詢這個 taskQueue,而後執行這個 register 任務。

注意,此時執行該方法的是 eventLoop 中的線程:

// AbstractChannel

private void register0(ChannelPromise promise) {
try {
...
boolean firstRegistration = neverRegistered;
// *** 進行 JDK 底層的操做:Channel 註冊到 Selector 上 ***
doRegister();

neverRegistered = false;
registered = true;
// 到這裏,就算是 registered 了

// 這一步也很關鍵,由於這涉及到了 ChannelInitializer 的 init(channel)
// 咱們以前說過,init 方法會將 ChannelInitializer 內部添加的 handlers 添加到 pipeline 中
pipeline.invokeHandlerAddedIfNeeded();

// 設置當前 promise 的狀態爲 success
// 由於當前 register 方法是在 eventLoop 中的線程中執行的,須要通知提交 register 操做的線程
safeSetSuccess(promise);

// 當前的 register 操做已經成功,該事件應該被 pipeline 上
// 全部關心 register 事件的 handler 感知到,往 pipeline 中扔一個事件
pipeline.fireChannelRegistered();

// 這裏 active 指的是 channel 已經打開
if (isActive()) {
// 若是該 channel 是第一次執行 register,那麼 fire ChannelActive 事件
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// 該 channel 以前已經 register 過了,
// 這裏讓該 channel 立馬去監聽通道中的 OP_READ 事件
beginRead();
}
}
} catch (Throwable t) {
...
}
}
private void register0(ChannelPromise promise) {
try {
...
boolean firstRegistration = neverRegistered;
// *** 進行 JDK 底層的操做:Channel 註冊到 Selector 上 ***
doRegister();

neverRegistered = false;
registered = true;
// 到這裏,就算是 registered 了

// 這一步也很關鍵,由於這涉及到了 ChannelInitializer 的 init(channel)
// 咱們以前說過,init 方法會將 ChannelInitializer 內部添加的 handlers 添加到 pipeline 中
pipeline.invokeHandlerAddedIfNeeded();

// 設置當前 promise 的狀態爲 success
// 由於當前 register 方法是在 eventLoop 中的線程中執行的,須要通知提交 register 操做的線程
safeSetSuccess(promise);

// 當前的 register 操做已經成功,該事件應該被 pipeline 上
// 全部關心 register 事件的 handler 感知到,往 pipeline 中扔一個事件
pipeline.fireChannelRegistered();

// 這裏 active 指的是 channel 已經打開
if (isActive()) {
// 若是該 channel 是第一次執行 register,那麼 fire ChannelActive 事件
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// 該 channel 以前已經 register 過了,
// 這裏讓該 channel 立馬去監聽通道中的 OP_READ 事件
beginRead();
}
}
} catch (Throwable t) {
...
}
}

咱們先說掉上面的 doRegister() 方法,而後再說 pipeline。

@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// 附 JDK 中 Channel 的 register 方法:
// public final SelectionKey register(Selector sel, int ops, Object att) {...}
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
...
}
}
}
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// 附 JDK 中 Channel 的 register 方法:
// public final SelectionKey register(Selector sel, int ops, Object att) {...}
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
...
}
}
}

咱們能夠看到,這裏作了 JDK 底層的 register 操做,將 SocketChannel(或 ServerSocketChannel) 註冊到 Selector 中,而且能夠看到,這裏的監聽集合設置爲了 0,也就是什麼都不監聽。

固然,也就意味着,後續必定有某個地方會須要修改這個 selectionKey 的監聽集合,否則啥都幹不了

咱們重點來講說 pipeline 操做,咱們以前在介紹 NioSocketChannel 的 pipeline 的時候介紹到,咱們的 pipeline 如今長這個樣子:

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

如今,咱們將看到這裏會把 LoggingHandler 和 EchoClientHandler 添加到 pipeline。

咱們繼續看代碼,register 成功之後,執行了如下操做:

pipeline.invokeHandlerAddedIfNeeded();pipeline.invokeHandlerAddedIfNeeded();

你們能夠跟蹤一下,這一步會執行到 pipeline 中 ChannelInitializer 實例的 handlerAdded 方法,在這裏會執行它的 init(context) 方法:

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
initChannel(ctx);
}
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
initChannel(ctx);
}
}

而後咱們看下 initChannel(ctx),這裏終於來了咱們以前介紹過的 init(channel) 方法:

private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
try {
// 1. 將把咱們自定義的 handlers 添加到 pipeline 中
initChannel((C) ctx.channel());
} catch (Throwable cause) {
...
} finally {
// 2. 將 ChannelInitializer 實例從 pipeline 中刪除
remove(ctx);
}
return true;
}
return false;
}
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
try {
// 1. 將把咱們自定義的 handlers 添加到 pipeline 中
initChannel((C) ctx.channel());
} catch (Throwable cause) {
...
} finally {
// 2. 將 ChannelInitializer 實例從 pipeline 中刪除
remove(ctx);
}
return true;
}
return false;
}

咱們前面也說過,ChannelInitializer 的 init(channel) 被執行之後,那麼其內部添加的 handlers 會進入到 pipeline 中,而後上面的 finally 塊中將 ChannelInitializer 的實例從 pipeline 中刪除,那麼此時 pipeline 就算創建起來了,以下圖:

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

其實這裏還有個問題,若是咱們在 ChannelInitializer 中添加的是一個 ChannelInitializer 實例呢?你們能夠考慮下這個狀況。

pipeline 創建了之後,而後咱們繼續往下走,會執行到這一句:

pipeline.fireChannelRegistered();pipeline.fireChannelRegistered();

咱們只要摸清楚了 fireChannelRegistered() 方法,之後碰到其餘像 fireChannelActive()、fireXxx() 等就知道怎麼回事了,它們都是相似的。咱們來看看這句代碼會發生什麼:

// DefaultChannelPipeline

@Override
public final ChannelPipeline fireChannelRegistered() {
// 注意這裏的傳參是 head
AbstractChannelHandlerContext.invokeChannelRegistered(head);
return this;
}
@Override
public final ChannelPipeline fireChannelRegistered() {
// 注意這裏的傳參是 head
AbstractChannelHandlerContext.invokeChannelRegistered(head);
return this;
}

也就是說,咱們往 pipeline 中扔了一個 channelRegistered 事件,這裏的 register 屬於 Inbound 事件,pipeline 接下來要作的就是執行 pipeline 中的 Inbound 類型的 handlers 中的 channelRegistered() 方法。

從上面的代碼,咱們能夠看出,往 pipeline 中扔出 channelRegistered 事件之後,第一個處理的 handler 是 head。

接下來,咱們仍是跟着代碼走,此時咱們來到了 pipeline 的第一個節點 head 的處理中:

// AbstractChannelHandlerContext

// next 此時是 head
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {

EventExecutor executor = next.executor();
// 執行 head 的 invokeChannelRegistered()
if (executor.inEventLoop()) {
next.invokeChannelRegistered();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRegistered();
}
});
}
}
// next 此時是 head
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {

EventExecutor executor = next.executor();
// 執行 head 的 invokeChannelRegistered()
if (executor.inEventLoop()) {
next.invokeChannelRegistered();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRegistered();
}
});
}
}

也就是說,這裏會先執行 head.invokeChannelRegistered() 方法,並且是放到 NioEventLoop 中的 taskQueue 中執行的:

// AbstractChannelHandlerContext

private void invokeChannelRegistered() {
if (invokeHandler()) {
try {
// handler() 方法此時會返回 head
((ChannelInboundHandler) handler()).channelRegistered(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRegistered();
}
}
private void invokeChannelRegistered() {
if (invokeHandler()) {
try {
// handler() 方法此時會返回 head
((ChannelInboundHandler) handler()).channelRegistered(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRegistered();
}
}

咱們去看 head 的 channelRegistered 方法:

// HeadContext

@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
// 1. 這一步是 head 對於 channelRegistered 事件的處理。沒有咱們要關心的
invokeHandlerAddedIfNeeded();
// 2. 向後傳播 Inbound 事件
ctx.fireChannelRegistered();
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
// 1. 這一步是 head 對於 channelRegistered 事件的處理。沒有咱們要關心的
invokeHandlerAddedIfNeeded();
// 2. 向後傳播 Inbound 事件
ctx.fireChannelRegistered();
}

而後 head 會執行 fireChannelRegister() 方法:

// AbstractChannelHandlerContext

@Override
public ChannelHandlerContext fireChannelRegistered() {
// 這裏很關鍵
// findContextInbound() 方法會沿着 pipeline 找到下一個 Inbound 類型的 handler
invokeChannelRegistered(findContextInbound());
return this;
}
@Override
public ChannelHandlerContext fireChannelRegistered() {
// 這裏很關鍵
// findContextInbound() 方法會沿着 pipeline 找到下一個 Inbound 類型的 handler
invokeChannelRegistered(findContextInbound());
return this;
}

注意:pipeline.fireChannelRegistered() 是將 channelRegistered 事件拋到 pipeline 中,pipeline 中的 handlers 準備處理該事件。而 context.fireChannelRegistered() 是一個 handler 處理完了之後,向後傳播給下一個 handler。

它們兩個的方法名字是同樣的,可是來自於不一樣的類。

findContextInbound() 將找到下一個 Inbound 類型的 handler,而後又是重複上面的幾個方法。

我以爲上面這塊代碼不必太糾結,總之就是從 head 中開始,依次往下尋找全部 Inbound handler,執行其 channelRegistered(ctx) 操做。

說了這麼多,咱們的 register 操做算是真正完成了。

下面,咱們回到 initAndRegister 這個方法:

final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
...
}

// 咱們上面說完了這行
ChannelFuture regFuture = config().group().register(channel);

// 若是在 register 的過程當中,發生了錯誤
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}

// 源碼中說得很清楚,若是到這裏,說明後續能夠進行 connect() 或 bind() 了,由於兩種狀況:
// 1. 若是 register 動做是在 eventLoop 中發起的,那麼到這裏的時候,register 必定已經完成
// 2. 若是 register 任務已經提交到 eventLoop 中,也就是進到了 eventLoop 中的 taskQueue 中,
// 因爲後續的 connect 或 bind 也會進入到同一個 eventLoop 的 queue 中,因此必定是會先 register 成功,纔會執行 connect 或 bind
return regFuture;
}
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
...
}

// 咱們上面說完了這行
ChannelFuture regFuture = config().group().register(channel);

// 若是在 register 的過程當中,發生了錯誤
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}

// 源碼中說得很清楚,若是到這裏,說明後續能夠進行 connect() 或 bind() 了,由於兩種狀況:
// 1. 若是 register 動做是在 eventLoop 中發起的,那麼到這裏的時候,register 必定已經完成
// 2. 若是 register 任務已經提交到 eventLoop 中,也就是進到了 eventLoop 中的 taskQueue 中,
// 因爲後續的 connect 或 bind 也會進入到同一個 eventLoop 的 queue 中,因此必定是會先 register 成功,纔會執行 connect 或 bind
return regFuture;
}

咱們要知道,不論是服務端的 NioServerSocketChannel 仍是客戶端的 NioSocketChannel,在 bind 或 connect 時,都會先進入 initAndRegister 這個方法,因此咱們上面說的那些,對於二者都是通用的。

你們要記住,register 操做是很是重要的,要知道這一步大概作了哪些事情,register 操做之後,將進入到 bind 或 connect 操做中。

connect 過程和 bind 過程分析

上面咱們介紹的 register 操做很是關鍵,它創建起來了不少的東西,它是 Netty 中 NioSocketChannel 和 NioServerSocketChannel 開始工做的起點。

這一節,咱們來講說 register 以後的 connect 操做和 bind 操做。這節很是簡單。

connect 過程分析

對於客戶端 NioSocketChannel 來講,前面 register 完成之後,就要開始 connect 了,這一步將鏈接到服務端。

private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
// 這裏完成了 register 操做
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();

// 這裏咱們不去糾結 register 操做是否 isDone()
if (regFuture.isDone()) {
if (!regFuture.isSuccess()) {
return regFuture;
}
// 看這裏
return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
} else {
....
}
}
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
// 這裏完成了 register 操做
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();

// 這裏咱們不去糾結 register 操做是否 isDone()
if (regFuture.isDone()) {
if (!regFuture.isSuccess()) {
return regFuture;
}
// 看這裏
return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
} else {
....
}
}

這裏你們本身一路點進去,我就不浪費篇幅了。最後,咱們會來到 AbstractChannel 的 connect 方法:

@Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return pipeline.connect(remoteAddress, promise);
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return pipeline.connect(remoteAddress, promise);
}

咱們看到,connect 操做是交給 pipeline 來執行的。進入 pipeline 中,咱們會發現,connect 這種 Outbound 類型的操做,是從 pipeline 的 tail 開始的:

前面咱們介紹的 register 操做是 Inbound 的,是從 head 開始的

@Override
public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return tail.connect(remoteAddress, promise);
}
@Override
public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return tail.connect(remoteAddress, promise);
}

接下來就是 pipeline 的操做了,從 tail 開始,執行 pipeline 上的 Outbound 類型的 handlers 的 connect(...) 方法,那麼真正的底層的 connect 的操做發生在哪裏呢?還記得咱們的 pipeline 的圖嗎?

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

從 tail 開始往前找 out 類型的 handlers,每通過一個 handler,都執行裏面的 connect() 方法,最後會到 head 中,由於 head 也是 Outbound 類型的,咱們須要的 connect 操做就在 head 中,它會負責調用 unsafe 中提供的 connect 方法:

// HeadContext
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
unsafe.connect(remoteAddress, localAddress, promise);
}
// HeadContext
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
unsafe.connect(remoteAddress, localAddress, promise);
}

接下來,咱們來看一看 connect 在 unsafe 類中所謂的底層操做:

// AbstractNioChannel.AbstractNioUnsafe
@Override
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
......

boolean wasActive = isActive();
// 你們本身點進去看 doConnect 方法
// 這一步會作 JDK 底層的 SocketChannel connect,而後設置 interestOps 爲 SelectionKey.OP_CONNECT
// 返回值表明是否已經鏈接成功
if (doConnect(remoteAddress, localAddress)) {
// 處理鏈接成功的狀況
fulfillConnectPromise(promise, wasActive);
} else {
connectPromise = promise;
requestedRemoteAddress = remoteAddress;

// 下面這塊代碼,在處理鏈接超時的狀況,代碼很簡單
// 這裏用到了 NioEventLoop 的定時任務的功能,這個咱們以前一直都沒有介紹過,由於我以爲也不過重要
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run() {
ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress);
if (connectPromise != null && connectPromise.tryFailure(cause)) {
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}

promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isCancelled()) {
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
close(voidPromise());
}
}
});
}
} catch (Throwable t) {
promise.tryFailure(annotateConnectException(t, remoteAddress));
closeIfClosed();
}
}
// AbstractNioChannel.AbstractNioUnsafe
@Override
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
......

boolean wasActive = isActive();
// 你們本身點進去看 doConnect 方法
// 這一步會作 JDK 底層的 SocketChannel connect,而後設置 interestOps 爲 SelectionKey.OP_CONNECT
// 返回值表明是否已經鏈接成功
if (doConnect(remoteAddress, localAddress)) {
// 處理鏈接成功的狀況
fulfillConnectPromise(promise, wasActive);
} else {
connectPromise = promise;
requestedRemoteAddress = remoteAddress;

// 下面這塊代碼,在處理鏈接超時的狀況,代碼很簡單
// 這裏用到了 NioEventLoop 的定時任務的功能,這個咱們以前一直都沒有介紹過,由於我以爲也不過重要
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run() {
ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress);
if (connectPromise != null && connectPromise.tryFailure(cause)) {
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}

promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isCancelled()) {
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
close(voidPromise());
}
}
});
}
} catch (Throwable t) {
promise.tryFailure(annotateConnectException(t, remoteAddress));
closeIfClosed();
}
}

若是上面的 doConnect 方法返回 false,那麼後續是怎麼處理的呢?

在上一節介紹的 register 操做中,channel 已經 register 到了 selector 上,只不過將 interestOps 設置爲了 0,也就是什麼都不監聽。

而在上面的 doConnect 方法中,咱們看到它在調用底層的 connect 方法後,會設置 interestOps 爲 SelectionKey.OP_CONNECT

剩下的就是 NioEventLoop 的事情了,還記得 NioEventLoop 的 run() 方法嗎?也就是說這裏的 connect 成功之後,這個 TCP 鏈接就創建起來了,後續的操做會在 NioEventLoop.run() 方法中被 processSelectedKeys() 方法處理掉。

bind 過程分析

說完 connect 過程,咱們再來簡單看下 bind 過程:

private ChannelFuture doBind(final SocketAddress localAddress) {
// **前面說的 initAndRegister**
final ChannelFuture regFuture = initAndRegister();

final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}

if (regFuture.isDone()) {
// register 動做已經完成,那麼執行 bind 操做
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
......
}
}
private ChannelFuture doBind(final SocketAddress localAddress) {
// **前面說的 initAndRegister**
final ChannelFuture regFuture = initAndRegister();

final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}

if (regFuture.isDone()) {
// register 動做已經完成,那麼執行 bind 操做
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
......
}
}

而後一直往裏看,會看到,bind 操做也是要由 pipeline 來完成的:

// AbstractChannel

@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}

bind 操做和 connect 同樣,都是 Outbound 類型的,因此都是 tail 開始:

@Override
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return tail.bind(localAddress, promise);
}
@Override
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return tail.bind(localAddress, promise);
}

最後的 bind 操做又到了 head 中,由 head 來調用 unsafe 提供的 bind 方法:

@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
unsafe.bind(localAddress, promise);
}
@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
unsafe.bind(localAddress, promise);
}

感興趣的讀者本身去看一下 unsafe 中的 bind 方法,很是簡單,bind 操做也不是什麼異步方法,咱們就介紹到這裏了。

本節很是簡單,就是想和你們介紹下 Netty 中各類操做的套路。

往期精彩文章

Mybatis合集:

Mybatis源碼分析之SqlSessionFactory(一)

Mybatis源碼分析之SqlSession和Excutor(二)

Mybatis源碼分析之Mapper執行SQL過程(三)

Mybatis源碼分析之Cache一級緩存原理(四)

Mybatis源碼分析之Cache二級緩存原理 (五)

mybatis結合redis實戰二級緩存(六)

Spring源碼分析:

【Spring源碼】Spring IOC 容器源碼分析(一)

【Spring源碼】Spring IOC 容器源碼分析(二)

【Spring源碼】Spring IOC 容器源碼分析(三)

Spring AOP源碼分析:

Spring AOP 使用介紹,從前世到此生

Spring AOP 源碼解析

NIO源碼分析:

Java NIO:Buffer、Channel 和 Selector

Java 非阻塞 IO 和異步 IO

Tomcat 中的 NIO 源碼分析

Netty源碼分析

認真的 Netty 源碼解析(一)

正在更新中~~~

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

相關文章
相關標籤/搜索