認真的 Netty 源碼解析(一)

本文又是一篇源碼分析文章,其實除了 Doug Lea 的併發包源碼,我是真不太愛寫源碼分析。由於要花很是多的時間,並且不少地方須要反覆組織語言。java

本文將介紹 Netty,Java 平臺上使用最普遍的 NIO 包,它是對 JDK 中的 NIO 實現的一層封裝,讓咱們能更方便地開發 NIO 程序。其實,Netty 不只僅是 NIO 吧,可是,基本上你們都衝着 NIO 來的。spring

我的感受國內對於 Netty 的吹噓是有點過了,主要是不少人靠它吃飯,要麼是搞培訓的,要麼是出書的,巴不得把 Netty 吹上天去,這種現象也是挺很差的,反而使得初學者以爲 Netty 是什麼高深的技術同樣。編程

Netty 的源碼不是很簡單,由於它比較多,並且各個類之間的關係錯綜複雜,不少人說它的源碼很好,這點我以爲通常,真要說好代碼,還得 Doug Lea 的併發源碼比較漂亮,一行行都是精華,不過它們是不一樣類型的,也沒什麼好對比的。Netty 源碼好就好在它的接口使用比較靈活,每每接口好用的框架,源碼都不會太簡單。數組

本文將立足於源碼分析,因此讀者須要先掌握 NIO 的基礎知識,至少我以前寫的 《Java NIO:Buffer、Channel 和 Selector》 中介紹的基礎知識要清楚,若是讀者已經對 Netty 有些瞭解,或者使用過,那就更好了。promise

  • 本文只介紹 TCP 相關的內容,Netty 對於其餘協議的支持,不在本文的討論範圍內。安全

  • 和併發包的源碼分析不同,我不可能一行一行源碼說,因此有些異常分支是會直接略過,除非我以爲須要介紹。服務器

  • Netty 源碼一直在更新,各版本之間有些差別,我是按照 2018-09-06 的最新版本 4.1.25.Final 來進行介紹的。併發

建議初學者在看完本文之後,能夠去翻翻《Netty In Action》,網上也能夠找到中文文字版的。app

準備

學習源碼,一開始確定是準備環境。框架

我喜歡用 maven,也喜歡 Spring Boot,因此我通常先到 https://start.spring.io/ 準備一個最簡單的腳手架。

10 秒搞定腳手架,而後就是導入到 Intellij 中,若是用新版本的 Spring Boot,可能還須要等待下載依賴,期間打開 https://mvnrepository.com/ 搜索立刻要用到的 maven 依賴。

Netty 分爲好些模塊,有 netty-handler、netty-buffer、netty-transport、netty-common 等等,也有一個 netty-all,它包含了全部的模塊。

既然咱們是源碼分析,那麼天然是用一個最簡單的。netty-all 不是最好的選擇,netty-example 纔是:

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-example</artifactId>
<version>4.1.25.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-example</artifactId>
<version>4.1.25.Final</version>
</dependency>

它不只能夠解決咱們的依賴,並且 example 裏面的示例很是適合咱們學習使用。

Echo 例子

Netty 做爲 NIO 的庫,天然既能夠做爲服務端接受請求,也能夠做爲客戶端發起請求。使用 Netty 開發客戶端或服務端都是很是簡單的,Netty 作了很好的封裝,咱們一般只要開發一個或多個 handler 用來處理咱們的自定義邏輯就能夠了。

下面,咱們來看一個常常會見到的例子,它叫 Echo,也就是回聲,客戶端傳過去什麼值,服務端原樣返回什麼值。

打開 netty-example 的源碼,把 echo 包下面的代碼複製出來玩一玩。

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

左邊是服務端代碼,右邊是客戶端代碼。

上面的代碼基本就是模板代碼,每次使用都是這一個套路,惟一須要咱們開發的部分是 handler(…) 和 childHandler(…) 方法中指定的各個 handler,如 EchoServerHandler 和 EchoClientHandler,固然 Netty 源碼也給咱們提供了不少的 handler,好比上面的 LoggingHandler,它就是 Netty 源碼中爲咱們提供的,須要的時候直接拿過來用就行了。

咱們先來看一下上述代碼中涉及到的一些內容:

  • ServerBootstrap 類用於建立服務端實例,Bootstrap 用於建立客戶端實例。

  • 兩個 EventLoopGroup:bossGroup 和 workerGroup,它們涉及的是 Netty 的線程模型,能夠看到服務端有兩個 group,而客戶端只有一個,它們就是 Netty 中的線程池。

  • Netty 中的 Channel,沒有直接使用 Java 原生的 ServerSocketChannel 和 SocketChannel,而是包裝了 NioServerSocketChannel 和 NioSocketChannel 與之對應。

    固然,也有對其餘協議的支持,如支持 UDP 協議的 NioDatagramChannel,本文只關心 TCP 相關的。

  • 左邊 handler(…) 方法指定了一個 handler(LoggingHandler),這個 handler 是給服務端收到新的請求的時候處理用的。右邊 handler(...) 方法指定了客戶端處理請求過程當中須要使用的 handlers。

    若是你想在 EchoServer 中也指定多個 handler,也能夠像右邊的 EchoClient 同樣使用 ChannelInitializer

  • 左邊 childHandler(…) 指定了 childHandler,這邊的 handlers 是給新建立的鏈接用的,咱們知道服務端 ServerSocketChannel 在 accept 一個鏈接之後,須要建立 SocketChannel 的實例,childHandler(…) 中設置的 handler 就是用於處理新建立的 SocketChannel 的,而不是用來處理 ServerSocketChannel 實例的。

  • pipeline:handler 能夠指定多個(須要上面的 ChannelInitializer 類輔助),它們會組成了一個 pipeline,它們其實就相似攔截器的概念,如今只要記住一點,每一個 NioSocketChannel 或 NioServerSocketChannel 實例內部都會有一個 pipeline 實例。pipeline 中還涉及到 handler 的執行順序。

  • ChannelFuture:這個涉及到 Netty 中的異步編程,和 JDK 中的 Future 接口相似。

對於不瞭解 Netty 的讀者,也不要有什麼壓力,我會一一介紹它們,本文主要面向新手,我以爲比較難理解或比較重要的部分,會花比較大的篇幅來介紹清楚。

上面的源碼中沒有展現消息發送和消息接收的處理,此部分我會在介紹完上面的這些內容之後再進行介紹。

下面,將分塊來介紹這些內容。鑑於讀者對 NIO 或 Netty 的瞭解程度可能良莠不齊,爲了照顧初學者,不少地方須要囉嗦一些,因此但願讀者一節一節往下看,對於本身熟悉的內容能夠適當看快一些。

Netty 中的 Channel

這節咱們來看看 NioSocketChannel 是怎麼和 JDK 底層的 SocketChannel 聯繫在一塊兒的,它們是一對一的關係。NioServerSocketChannel 和 ServerSocketChannel 同理,也是一對一的關係。

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

在 Bootstrap(客戶端) 和 ServerBootstrap(服務端) 的啓動過程當中都會調用 channel(…) 方法:

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

下面,咱們來看 channel(…) 方法的源碼:

// AbstractBootstrap
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
// AbstractBootstrap
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}

咱們能夠看到,這個方法只是設置了 channelFactory 爲 ReflectiveChannelFactory 的一個實例,而後咱們看下這裏的 ReflectiveChannelFactory 究竟是什麼:

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

newChannel() 方法是 ChannelFactory 接口中的惟一方法,工廠模式你們都很熟悉。咱們能夠看到,ReflectiveChannelFactory#newChannel() 方法中使用了反射調用 Channel 的無參構造方法來建立 Channel,咱們只要知道,ChannelFactory 的 newChannel() 方法何時會被調用就能夠了。

  • 對於 NioSocketChannel,因爲它充當客戶端的功能,它的建立時機在 connect(…) 的時候;

  • 對於 NioServerSocketChannel 來講,它充當服務端功能,它的建立時機在綁定端口 bind(…) 的時候。

接下來,咱們來簡單追蹤下充當客戶端的 Bootstrap 中 NioSocketChannel 的建立過程,看看 NioSocketChannel 是怎麼和 JDK 中的 SocketChannel 關聯在一塊兒的:

// Bootstrap
public ChannelFuture connect(String inetHost, int inetPort) {
return connect(InetSocketAddress.createUnresolved(inetHost, inetPort));
}
// Bootstrap
public ChannelFuture connect(String inetHost, int inetPort) {
return connect(InetSocketAddress.createUnresolved(inetHost, inetPort));
}

而後再往裏看,到這個方法:

public ChannelFuture connect(SocketAddress remoteAddress) {
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
// validate 只是校驗一下各個參數是否是正確設置了
validate();
return doResolveAndConnect(remoteAddress, config.localAddress());
}
public ChannelFuture connect(SocketAddress remoteAddress) {
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
// validate 只是校驗一下各個參數是否是正確設置了
validate();
return doResolveAndConnect(remoteAddress, config.localAddress());
}

繼續:

// 再往裏就到這裏了
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
// 咱們要說的部分在這裏
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
......
}
// 再往裏就到這裏了
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
// 咱們要說的部分在這裏
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
......
}

而後,咱們看 initAndRegister() 方法:

final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 前面咱們說過,這裏會進行 Channel 的實例化
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
...
}
...
return regFuture;
}
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 前面咱們說過,這裏會進行 Channel 的實例化
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
...
}
...
return regFuture;
}

咱們找到了 channel = channelFactory.newChannel() 這行代碼,根據前面說的,這裏會調用相應 Channel 的無參構造方法。

而後咱們就能夠去看 NioSocketChannel 的構造方法了:

public NioSocketChannel() {
// SelectorProvider 實例用於建立 JDK 的 SocketChannel 實例
this(DEFAULT_SELECTOR_PROVIDER);
}

public NioSocketChannel(SelectorProvider provider) {
// 看這裏,newSocket(provider) 方法會建立 JDK 的 SocketChannel
this(newSocket(provider));
}
public NioSocketChannel() {
// SelectorProvider 實例用於建立 JDK 的 SocketChannel 實例
this(DEFAULT_SELECTOR_PROVIDER);
}

public NioSocketChannel(SelectorProvider provider) {
// 看這裏,newSocket(provider) 方法會建立 JDK 的 SocketChannel
this(newSocket(provider));
}

咱們能夠看到,在調用 newSocket(provider) 的時候,會建立 JDK NIO 的一個 SocketChannel 實例:

private static SocketChannel newSocket(SelectorProvider provider) {
try {
// 建立 SocketChannel 實例
return provider.openSocketChannel();
} catch (IOException e) {
throw new ChannelException("Failed to open a socket.", e);
}
}
private static SocketChannel newSocket(SelectorProvider provider) {
try {
// 建立 SocketChannel 實例
return provider.openSocketChannel();
} catch (IOException e) {
throw new ChannelException("Failed to open a socket.", e);
}
}

NioServerSocketChannel 同理,也很是簡單,從 ServerBootstrap#bind(...) 方法一路點進去就清楚了。

因此咱們知道了,NioSocketChannel 在實例化過程當中,會先實例化 JDK 底層的 SocketChannel,NioServerSocketChannel 也同樣,會先實例化 ServerSocketChannel 實例:

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

說到這裏,咱們順便再繼續往裏看一下 NioSocketChannel 的構造方法:

public NioSocketChannel(SelectorProvider provider) {
this(newSocket(provider));
}
public NioSocketChannel(SelectorProvider provider) {
this(newSocket(provider));
}

剛纔咱們看到這裏,newSocket(provider) 建立了底層的 SocketChannel 實例,咱們繼續往下看構造方法:

public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
config = new NioSocketChannelConfig(this, socket.socket());
}
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
config = new NioSocketChannelConfig(this, socket.socket());
}

上面有兩行代碼,第二行代碼很簡單,實例化了內部的 NioSocketChannelConfig 實例,它用於保存 channel 的配置信息,這裏沒有咱們如今須要關心的內容,直接跳過。

第一行調用父類構造器,除了設置屬性外,還設置了 SocketChannel 的非阻塞模式:

protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
// 毫無疑問,客戶端關心的是 OP_READ 事件,等待讀取服務端返回數據
super(parent, ch, SelectionKey.OP_READ);
}

// 而後是到這裏
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
// 咱們看到這裏只是保存了 SelectionKey.OP_READ 這個信息,在後面的時候會用到
this.readInterestOp = readInterestOp;
try {
// ******設置 channel 的非阻塞模式******
ch.configureBlocking(false);
} catch (IOException e) {
......
}
}
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
// 毫無疑問,客戶端關心的是 OP_READ 事件,等待讀取服務端返回數據
super(parent, ch, SelectionKey.OP_READ);
}

// 而後是到這裏
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
// 咱們看到這裏只是保存了 SelectionKey.OP_READ 這個信息,在後面的時候會用到
this.readInterestOp = readInterestOp;
try {
// ******設置 channel 的非阻塞模式******
ch.configureBlocking(false);
} catch (IOException e) {
......
}
}

NioServerSocketChannel 的構造方法相似,也設置了非阻塞,而後設置服務端關心的 SelectionKey.OP_ACCEPT 事件:

public NioServerSocketChannel(ServerSocketChannel channel) {
// 對於服務端來講,關心的是 SelectionKey.OP_ACCEPT 事件,等待客戶端鏈接
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
public NioServerSocketChannel(ServerSocketChannel channel) {
// 對於服務端來講,關心的是 SelectionKey.OP_ACCEPT 事件,等待客戶端鏈接
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

這節關於 Channel 的內容咱們先介紹這麼多,主要就是實例化了 JDK 層的 SocketChannel 或 ServerSocketChannel,而後設置了非阻塞模式,咱們後面再繼續深刻下去。

Netty 中的 Future、Promise

Netty 中很是多的異步調用,因此在介紹更多 NIO 相關的內容以前,咱們來看看它的異步接口是怎麼使用的。

前面咱們在介紹 Echo 例子的時候,已經用過了 ChannelFuture 這個接口了:

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

爭取在看完本節後,讀者能搞清楚上面的這幾行劃線部分是怎麼走的。

關於 Future 接口,我想你們應該都很熟悉,用得最多的就是在使用 Java 的線程池 ThreadPoolExecutor 的時候了。在 submit 一個任務到線程池中的時候,返回的就是一個 Future 實例,經過它來獲取提交的任務的執行狀態和最終的執行結果,咱們最經常使用它的 isDone() 和 get() 方法。

下面是 JDK 中的 Future 接口 java.util.concurrent.Future:

public interface Future<V> {
// 取消該任務
boolean cancel(boolean mayInterruptIfRunning);
// 任務是否已取消
boolean isCancelled();
// 任務是否已完成
boolean isDone();
// 阻塞獲取任務執行結果
V get() throws InterruptedException, ExecutionException;
// 帶超時參數的獲取任務執行結果
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
public interface Future<V> {
// 取消該任務
boolean cancel(boolean mayInterruptIfRunning);
// 任務是否已取消
boolean isCancelled();
// 任務是否已完成
boolean isDone();
// 阻塞獲取任務執行結果
V get() throws InterruptedException, ExecutionException;
// 帶超時參數的獲取任務執行結果
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

Netty 中的 Future 接口(同名)繼承了 JDK 中的 Future 接口,而後添加了一些方法:

// io.netty.util.concurrent.Future

public interface Future<V> extends java.util.concurrent.Future<V> {

// 是否成功
boolean isSuccess();

// 是否可取消
boolean isCancellable();

// 若是任務執行失敗,這個方法返回異常信息
Throwable cause();

// 添加 Listener 來進行回調
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

// 阻塞等待任務結束,若是任務失敗,將「致使失敗的異常」從新拋出來
Future<V> sync() throws InterruptedException;
// 不響應中斷的 sync(),這個你們應該都很熟了
Future<V> syncUninterruptibly();

// 阻塞等待任務結束,和 sync() 功能是同樣的,不過若是任務失敗,它不會拋出執行過程當中的異常
Future<V> await() throws InterruptedException;
Future<V> awaitUninterruptibly();
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
boolean await(long timeoutMillis) throws InterruptedException;
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
boolean awaitUninterruptibly(long timeoutMillis);

// 獲取執行結果,不阻塞。咱們都知道 java.util.concurrent.Future 中的 get() 是阻塞的
V getNow();

// 取消任務執行,若是取消成功,任務會由於 CancellationException 異常而致使失敗
// 也就是 isSuccess()==false,同時上面的 cause() 方法返回 CancellationException 的實例。
// mayInterruptIfRunning 說的是:是否對正在執行該任務的線程進行中斷(這樣才能中止該任務的執行),
// 彷佛 Netty 中 Future 接口的各個實現類,都沒有使用這個參數
@Override
boolean cancel(boolean mayInterruptIfRunning);
}
public interface Future<V> extends java.util.concurrent.Future<V> {

// 是否成功
boolean isSuccess();

// 是否可取消
boolean isCancellable();

// 若是任務執行失敗,這個方法返回異常信息
Throwable cause();

// 添加 Listener 來進行回調
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

// 阻塞等待任務結束,若是任務失敗,將「致使失敗的異常」從新拋出來
Future<V> sync() throws InterruptedException;
// 不響應中斷的 sync(),這個你們應該都很熟了
Future<V> syncUninterruptibly();

// 阻塞等待任務結束,和 sync() 功能是同樣的,不過若是任務失敗,它不會拋出執行過程當中的異常
Future<V> await() throws InterruptedException;
Future<V> awaitUninterruptibly();
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
boolean await(long timeoutMillis) throws InterruptedException;
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
boolean awaitUninterruptibly(long timeoutMillis);

// 獲取執行結果,不阻塞。咱們都知道 java.util.concurrent.Future 中的 get() 是阻塞的
V getNow();

// 取消任務執行,若是取消成功,任務會由於 CancellationException 異常而致使失敗
// 也就是 isSuccess()==false,同時上面的 cause() 方法返回 CancellationException 的實例。
// mayInterruptIfRunning 說的是:是否對正在執行該任務的線程進行中斷(這樣才能中止該任務的執行),
// 彷佛 Netty 中 Future 接口的各個實現類,都沒有使用這個參數
@Override
boolean cancel(boolean mayInterruptIfRunning);
}

看完上面的 Netty 的 Future 接口,咱們能夠發現,它加了 sync() 和 await() 用於阻塞等待,還加了 Listeners,只要任務結束去回調 Listener 們就能夠了,那麼咱們就不必定要主動調用 isDone() 來獲取狀態,或經過 get() 阻塞方法來獲取值。

因此它其實有兩種使用範式

順便說下 sync() 和 await() 的區別:sync() 內部會先調用 await() 方法,等 await() 方法返回後,會檢查下這個任務是否失敗,若是失敗,從新將致使失敗的異常拋出來。也就是說,若是使用 await(),任務拋出異常後,await() 方法會返回,可是不會拋出異常,而 sync() 方法返回的同時會拋出異常。

咱們也能夠看到,Future 接口沒有和 IO 操做關聯在一塊兒,仍是比較純淨的接口。

接下來,咱們來看 Future 接口的子接口 ChannelFuture,這個接口用得最多,它將和 IO 操做中的 Channel 關聯在一塊兒了,用於異步處理 Channel 中的事件。

public interface ChannelFuture extends Future<Void> {

// ChannelFuture 關聯的 Channel
Channel channel();

// 覆寫如下幾個方法,使得它們返回值爲 ChannelFuture 類型
@Override
ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

@Override
ChannelFuture sync() throws InterruptedException;
@Override
ChannelFuture syncUninterruptibly();

@Override
ChannelFuture await() throws InterruptedException;
@Override
ChannelFuture awaitUninterruptibly();

// 用來標記該 future 是 void 的,
// 這樣就不容許使用 addListener(...), sync(), await() 以及它們的幾個重載方法
boolean isVoid();
}
public interface ChannelFuture extends Future<Void> {

// ChannelFuture 關聯的 Channel
Channel channel();

// 覆寫如下幾個方法,使得它們返回值爲 ChannelFuture 類型
@Override
ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

@Override
ChannelFuture sync() throws InterruptedException;
@Override
ChannelFuture syncUninterruptibly();

@Override
ChannelFuture await() throws InterruptedException;
@Override
ChannelFuture awaitUninterruptibly();

// 用來標記該 future 是 void 的,
// 這樣就不容許使用 addListener(...), sync(), await() 以及它們的幾個重載方法
boolean isVoid();
}

咱們看到,ChannelFuture 接口相對於 Future 接口,除了將 channel 關聯進來,沒有增長什麼東西。還有個 isVoid() 方法算是不那麼重要的存在吧。其餘幾個都是方法覆寫,爲了讓返回值類型變爲 ChannelFuture,而不是原來的 Future。

這裏有點跳,咱們來介紹下 Promise 接口,它和 ChannelFuture 接口無關,而是和前面的 Future 接口相關,Promise 這個接口很是重要。

Promise 接口和 ChannelFuture 同樣,也繼承了 Netty 的 Future 接口,而後加了一些 Promise 的內容:

public interface Promise<V> extends Future<V> {

// 標記該 future 成功及設置其執行結果,而且會通知全部的 listeners。
// 若是該操做失敗,將拋出異常(失敗指的是該 future 已經有告終果了,成功的結果,或者失敗的結果)
Promise<V> setSuccess(V result);

// 和 setSuccess 方法同樣,只不過若是失敗,它不拋異常,返回 false
boolean trySuccess(V result);

// 標記該 future 失敗,及其失敗緣由。
// 若是失敗,將拋出異常(失敗指的是已經有告終果了)
Promise<V> setFailure(Throwable cause);

// 標記該 future 失敗,及其失敗緣由。
// 若是已經有結果,返回 false,不拋出異常
boolean tryFailure(Throwable cause);

// 標記該 future 不能夠被取消
boolean setUncancellable();

// 這裏和 ChannelFuture 同樣,對這幾個方法進行覆寫,目的是爲了返回 Promise 類型的實例
@Override
Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
@Override
Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

@Override
Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
@Override
Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

@Override
Promise<V> await() throws InterruptedException;
@Override
Promise<V> awaitUninterruptibly();

@Override
Promise<V> sync() throws InterruptedException;
@Override
Promise<V> syncUninterruptibly();
}
public interface Promise<V> extends Future<V> {

// 標記該 future 成功及設置其執行結果,而且會通知全部的 listeners。
// 若是該操做失敗,將拋出異常(失敗指的是該 future 已經有告終果了,成功的結果,或者失敗的結果)
Promise<V> setSuccess(V result);

// 和 setSuccess 方法同樣,只不過若是失敗,它不拋異常,返回 false
boolean trySuccess(V result);

// 標記該 future 失敗,及其失敗緣由。
// 若是失敗,將拋出異常(失敗指的是已經有告終果了)
Promise<V> setFailure(Throwable cause);

// 標記該 future 失敗,及其失敗緣由。
// 若是已經有結果,返回 false,不拋出異常
boolean tryFailure(Throwable cause);

// 標記該 future 不能夠被取消
boolean setUncancellable();

// 這裏和 ChannelFuture 同樣,對這幾個方法進行覆寫,目的是爲了返回 Promise 類型的實例
@Override
Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
@Override
Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

@Override
Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
@Override
Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

@Override
Promise<V> await() throws InterruptedException;
@Override
Promise<V> awaitUninterruptibly();

@Override
Promise<V> sync() throws InterruptedException;
@Override
Promise<V> syncUninterruptibly();
}

可能有些讀者對 Promise 的概念不是很熟悉,這裏簡單說兩句。

我以爲只要明白一點,Promise 實例內部是一個任務,任務的執行每每是異步的,一般是一個線程池來處理任務。Promise 提供的 setSuccess(V result) 或 setFailure(Throwable t) 未來會被某個執行任務的線程在執行完成之後調用,同時那個線程在調用 setSuccess(result) 或 setFailure(t) 後會回調 listeners 的回調函數(固然,回調的具體內容不必定要由執行任務的線程本身來執行,它能夠建立新的線程來執行,也能夠將回調任務提交到某個線程池來執行)。並且,一旦 setSuccess(...) 或 setFailure(...) 後,那些 await() 或 sync() 的線程就會從等待中返回。

因此這裏就有兩種編程方式,一種是用 await(),等 await() 方法返回後,獲得 promise 的執行結果,而後處理它;另外一種就是提供 Listener 實例,咱們不太關心任務何時會執行完,只要它執行完了之後會去執行 listener 中的處理方法就行。

接下來,咱們再來看下 ChannelPromise,它繼承了前面介紹的 ChannelFuture 和 Promise 接口。

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

ChannelPromise 接口在 Netty 中使用得比較多,由於它綜合了 ChannelFuture 和 Promise 兩個接口:

/**
* Special {@link ChannelFuture} which is writable.
*/
public interface ChannelPromise extends ChannelFuture, Promise<Void> {

// 覆寫 ChannelFuture 中的 channel() 方法,其實這個方法一點沒變
@Override
Channel channel();

// 下面幾個方法是覆寫 Promise 中的接口,爲了返回值類型是 ChannelPromise
@Override
ChannelPromise setSuccess(Void result);
ChannelPromise setSuccess();
boolean trySuccess();
@Override
ChannelPromise setFailure(Throwable cause);

// 到這裏你們應該都熟悉了,下面幾個方法的覆寫也是爲了獲得 ChannelPromise 類型的實例
@Override
ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

@Override
ChannelPromise sync() throws InterruptedException;
@Override
ChannelPromise syncUninterruptibly();
@Override
ChannelPromise await() throws InterruptedException;
@Override
ChannelPromise awaitUninterruptibly();

/**
* Returns a new {@link ChannelPromise} if {@link #isVoid()} returns {@code true} otherwise itself.
*/
// 咱們忽略這個方法吧。
ChannelPromise unvoid();
}
/**
* Special {@link ChannelFuture} which is writable.
*/
public interface ChannelPromise extends ChannelFuture, Promise<Void> {

// 覆寫 ChannelFuture 中的 channel() 方法,其實這個方法一點沒變
@Override
Channel channel();

// 下面幾個方法是覆寫 Promise 中的接口,爲了返回值類型是 ChannelPromise
@Override
ChannelPromise setSuccess(Void result);
ChannelPromise setSuccess();
boolean trySuccess();
@Override
ChannelPromise setFailure(Throwable cause);

// 到這裏你們應該都熟悉了,下面幾個方法的覆寫也是爲了獲得 ChannelPromise 類型的實例
@Override
ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

@Override
ChannelPromise sync() throws InterruptedException;
@Override
ChannelPromise syncUninterruptibly();
@Override
ChannelPromise await() throws InterruptedException;
@Override
ChannelPromise awaitUninterruptibly();

/**
* Returns a new {@link ChannelPromise} if {@link #isVoid()} returns {@code true} otherwise itself.
*/
// 咱們忽略這個方法吧。
ChannelPromise unvoid();
}

咱們能夠看到,它綜合了 ChannelFuture 和 Promise 中的方法,只不過經過覆寫將返回值都變爲 ChannelPromise 了而已,沒有增長什麼新的功能。

小結一下,咱們上面介紹了幾個接口,Future 以及它的子接口 ChannelFuture 和 Promise,而後是 ChannelPromise 接口同時繼承了 ChannelFuture 和 Promise。

我把這幾個接口的主要方法列在一塊兒,這樣你們看得清晰些:

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

接下來,咱們須要來一個實現類,這樣才能比較直觀地看出它們是怎麼使用的,由於上面的這些都是接口定義,具體還得看實現類是怎麼工做的。

下面,咱們來介紹下 DefaultPromise 這個實現類,這個類很經常使用,它的源碼也不短,咱們先介紹幾個關鍵的內容,而後介紹一個示例使用。

首先,咱們看下它有哪些屬性:

public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
// 保存執行結果
private volatile Object result;
// 執行任務的線程池,promise 持有 executor 的引用,這個其實有點奇怪了
// 由於「任務」其實不必知道本身在哪裏被執行的
private final EventExecutor executor;
// 監聽者,回調函數,任務結束後(正常或異常結束)執行
private Object listeners;

// 等待這個 promise 的線程數(調用sync()/await()進行等待的線程數量)
private short waiters;

// 是否正在喚醒等待線程,用於防止重複執行喚醒,否則會重複執行 listeners 的回調方法
private boolean notifyingListeners;
......
}
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
// 保存執行結果
private volatile Object result;
// 執行任務的線程池,promise 持有 executor 的引用,這個其實有點奇怪了
// 由於「任務」其實不必知道本身在哪裏被執行的
private final EventExecutor executor;
// 監聽者,回調函數,任務結束後(正常或異常結束)執行
private Object listeners;

// 等待這個 promise 的線程數(調用sync()/await()進行等待的線程數量)
private short waiters;

// 是否正在喚醒等待線程,用於防止重複執行喚醒,否則會重複執行 listeners 的回調方法
private boolean notifyingListeners;
......
}

能夠看出,此類實現了 Promise,可是沒有實現 ChannelFuture,因此它和 Channel 聯繫不起來。

別急,咱們後面會碰到另外一個類 DefaultChannelPromise 的使用,這個類是綜合了 ChannelFuture 和 Promise 的,可是它的實現其實大部分都是繼承自這裏的 DefaultPromise 類的。

說完上面的屬性之後,你們能夠看下 setSuccess(V result) 、trySuccess(V result) 和 setFailure(Throwable cause) 、 tryFailure(Throwable cause) 這幾個方法:

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

看出 setSuccess(result) 和 trySuccess(result) 的區別了嗎?

上面幾個方法都很是簡單,先設置好值,而後執行監聽者們的回調方法。notifyListeners() 方法感興趣的讀者也能夠看一看,不過它還涉及到 Netty 線程池的一些內容,咱們尚未介紹到線程池,這裏就不展開了。上面的代碼,在 setSuccess0 或 setFailure0 方法中都會喚醒阻塞在 sync() 或 await() 的線程

另外,就是能夠看下 sync() 和 await() 的區別,其餘的我以爲隨便看看就行了。

@Override
public Promise<V> sync() throws InterruptedException {
await();
// 若是任務是失敗的,從新拋出相應的異常
rethrowIfFailed();
return this;
}
@Override
public Promise<V> sync() throws InterruptedException {
await();
// 若是任務是失敗的,從新拋出相應的異常
rethrowIfFailed();
return this;
}

接下來,咱們來寫個實例代碼吧:

 public static void main(String[] args) {

// 構造線程池
EventExecutor executor = new DefaultEventExecutor();

// 建立 DefaultPromise 實例
Promise promise = new DefaultPromise(executor);

// 下面給這個 promise 添加兩個 listener
promise.addListener(new GenericFutureListener<Future<Integer>>() {
@Override
public void operationComplete(Future future) throws Exception {
if (future.isSuccess()) {
System.out.println("任務結束,結果:" + future.get());
} else {
System.out.println("任務失敗,異常:" + future.cause());
}
}
}).addListener(new GenericFutureListener<Future<Integer>>() {
@Override
public void operationComplete(Future future) throws Exception {
System.out.println("任務結束,balabala...");
}
});

// 提交任務到線程池,五秒後執行結束,設置執行 promise 的結果
executor.submit(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
}
// 設置 promise 的結果
// promise.setFailure(new RuntimeException());
promise.setSuccess(123456);
}
});

// main 線程阻塞等待執行結果
try {
promise.sync();
} catch (InterruptedException e) {
}
}
public static void main(String[] args) {

// 構造線程池
EventExecutor executor = new DefaultEventExecutor();

// 建立 DefaultPromise 實例
Promise promise = new DefaultPromise(executor);

// 下面給這個 promise 添加兩個 listener
promise.addListener(new GenericFutureListener<Future<Integer>>() {
@Override
public void operationComplete(Future future) throws Exception {
if (future.isSuccess()) {
System.out.println("任務結束,結果:" + future.get());
} else {
System.out.println("任務失敗,異常:" + future.cause());
}
}
}).addListener(new GenericFutureListener<Future<Integer>>() {
@Override
public void operationComplete(Future future) throws Exception {
System.out.println("任務結束,balabala...");
}
});

// 提交任務到線程池,五秒後執行結束,設置執行 promise 的結果
executor.submit(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
}
// 設置 promise 的結果
// promise.setFailure(new RuntimeException());
promise.setSuccess(123456);
}
});

// main 線程阻塞等待執行結果
try {
promise.sync();
} catch (InterruptedException e) {
}
}

運行代碼,兩個 listener 將在 5 秒後將輸出:

任務結束,結果:123456
任務結束,balabala...
任務結束,結果:123456
任務結束,balabala...

讀者這裏能夠試一下 sync() 和 await() 的區別,在任務中調用 promise.setFailure(new RuntimeException()) 試試看。

上面的代碼中,你們可能會對線程池 executor 和 promise 之間的關係感到有點迷惑。讀者應該也要清楚,具體的任務不必定就要在這個 executor 中被執行。任務結束之後,須要調用 promise.setSuccess(result) 做爲通知。

一般來講,promise 表明的 future 是不須要和線程池攪在一塊兒的,future 只關心任務是否結束以及任務的執行結果,至因而哪一個線程或哪一個線程池執行的任務,future 實際上是不關心的。

不過 Netty 畢竟不是要建立一個通用的線程池實現,而是和它要處理的 IO 息息相關的,因此咱們只不過要理解它就行了。

這節就說這麼多吧,咱們回過頭來再看一下這張圖,看看你們是否是看懂了這節內容:

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

咱們就說說上圖左邊的部分吧,雖然咱們還不知道 bind() 操做中具體會作什麼工做,可是咱們應該能夠猜出一二。

顯然,main 線程調用 b.bind(port) 這個方法會返回一個 ChannelFuture,bind() 是一個異步方法,當某個執行線程執行了真正的綁定操做後,那個執行線程必定會標記這個 future 爲成功(咱們假定 bind 會成功),而後這裏的 sync() 方法(main 線程)就會返回了。

若是 bind(port) 失敗,咱們知道,sync() 方法會將異常拋出來,而後就會執行到 finally 塊了。

一旦綁定端口 bind 成功,進入下面一行,f.channel() 方法會返回該 future 關聯的 channel。

channel.closeFuture() 也會返回一個 ChannelFuture,而後調用了 sync() 方法,這個 sync() 方法返回的條件是:有其餘的線程關閉了 NioServerSocketChannel,每每是由於須要停掉服務了,而後那個線程會設置 future 的狀態( setSuccess(result) 或 setFailure(cause) ),這個 sync() 方法纔會返回。

這節就到這裏,但願你們對 Netty 中的異步編程有些瞭解,後續碰到源碼的時候能知道是怎麼使用的了。

ChannelPipeline,和 Inbound、Outbound

我想不少讀者應該或多或少都有 Netty 中 pipeline 的概念。前面咱們說了,使用 Netty 的時候,咱們一般就只要寫一些自定義的 handler 就能夠了,咱們定義的這些 handler 會組成一個 pipeline,用於處理 IO 事件,這個和咱們平時接觸的 Filter 或 Interceptor 表達的差很少是一個意思。

每一個 Channel 內部都有一個 pipeline,pipeline 由多個 handler 組成,handler 之間的順序是很重要的,由於 IO 事件將按照順序順次通過 pipeline 上的 handler,這樣每一個 handler 能夠專一於作一點點小事,由多個 handler 組合來完成一些複雜的邏輯。

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

從圖中,咱們知道這是一個雙向鏈表。

首先,咱們看兩個重要的概念:Inbound 和 Outbound。在 Netty 中,IO 事件被分爲 Inbound 事件和 Outbound 事件。

Outbound 的 out 指的是 出去,有哪些 IO 事件屬於此類呢?好比 connect、write、flush 這些 IO 操做是往外部方向進行的,它們就屬於 Outbound 事件。

其餘的,諸如 accept、read 這種就屬於 Inbound 事件。

好比客戶端在發起請求的時候,須要 1️⃣connect 到服務器,而後 2️⃣write 數據傳到服務器,再而後 3️⃣read 服務器返回的數據,前面的 connect 和 write 就是 out 事件,後面的 read 就是 in 事件。

好比不少初學者看不懂下面的這段代碼,這段代碼用於服務端的 childHandler 中:

1. pipeline.addLast(new StringDecoder());
2. pipeline.addLast(new StringEncoder());
3. pipeline.addLast(new BizHandler());
1. pipeline.addLast(new StringDecoder());
2. pipeline.addLast(new StringEncoder());
3. pipeline.addLast(new BizHandler());

初學者確定都納悶,覺得這個順序寫錯了,應該是先 decode 客戶端過來的數據,而後用 BizHandler 處理業務邏輯,最後再 encode 數據而後返回給客戶端,因此添加的順序應該是 1 -> 3 -> 2 纔對。

其實這裏的三個 handler 是分組的,分爲 Inbound(1 和 3) 和 Outbound(2):

1. pipeline.addLast(new StringDecoder());
2. pipeline.addLast(new StringEncoder());
3. pipeline.addLast(new BizHandler());
1. pipeline.addLast(new StringDecoder());
2. pipeline.addLast(new StringEncoder());
3. pipeline.addLast(new BizHandler());
  • 客戶端鏈接進來的時候,讀取(read)客戶端請求數據的操做是 Inbound 的,因此會先使用 1,而後是 3 對處理進行處理;

  • 處理完數據後,返回給客戶端數據的 write 操做是 Outbound 的,此時使用的是 2。

因此雖然添加順序有點怪,可是執行順序實際上是按照 1 -> 3 -> 2 進行的。

若是咱們在上面的基礎上,加上下面的第四行,這是一個 OutboundHandler:

4. pipeline.addLast(new OutboundHandlerA());4. pipeline.addLast(new OutboundHandlerA());

那麼執行順序是否是就是 1 -> 3 -> 2 -> 4 呢?答案是:不是的。

對於 Inbound 操做,按照添加順序執行每一個 Inbound 類型的 handler;而對於 Outbound 操做,是反着來的,從後往前,順次執行 Outbound 類型的 handler。

因此,上面的順序應該是先 1 後 3,它們是 Inbound 的,而後是 4,最後纔是 2,它們兩個是 Outbound 的。說實話,我真不喜歡這種組織方式。

到這裏,我想你們應該都知道 Inbound 和 Outbound 了吧?下面咱們來介紹它們的接口使用。

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

定義處理 Inbound 事件的 handler 須要實現 ChannelInboundHandler,定義處理 Outbound 事件的 handler 須要實現 ChannelOutboundHandler。最下面的三個類,是 Netty 提供的適配器,特別的,若是咱們但願定義一個 handler 能同時處理 Inbound 和 Outbound 事件,能夠經過繼承中間的 ChannelDuplexHandler 的方式,好比 LoggingHandler 這種既能夠用來處理 Inbound 也能夠用來處理 Outbound 事件的 handler。

有了 Inbound 和 Outbound 的概念之後,咱們來開始介紹 Pipeline 的源碼。

咱們說過,一個 Channel 關聯一個 pipeline,NioSocketChannel 和 NioServerSocketChannel 在執行構造方法的時候,都會走到它們的父類 AbstractChannel 的構造方法中:

protected AbstractChannel(Channel parent) {
this.parent = parent;
// 給每一個 channel 分配一個惟一 id
id = newId();
// 每一個 channel 內部須要一個 Unsafe 的實例
unsafe = newUnsafe();
// 每一個 channel 內部都會建立一個 pipeline
pipeline = newChannelPipeline();
}
protected AbstractChannel(Channel parent) {
this.parent = parent;
// 給每一個 channel 分配一個惟一 id
id = newId();
// 每一個 channel 內部須要一個 Unsafe 的實例
unsafe = newUnsafe();
// 每一個 channel 內部都會建立一個 pipeline
pipeline = newChannelPipeline();
}

上面的三行代碼中,id 比較不重要,Netty 中的 Unsafe 實例其實挺重要的,這裏簡單介紹一下。

在 JDK 的源碼中,sun.misc.Unsafe 類提供了一些底層操做的能力,它設計出來是給 JDK 中的源碼使用的,好比 AQS、ConcurrentHashMap 等,咱們在以前的併發包的源碼分析中也看到了不少它們使用 Unsafe 的場景,這個 Unsafe 類不是給咱們的代碼使用的,是給 JDK 源碼使用的(須要的話,咱們也是能夠獲取它的實例的)。

Unsafe 類的構造方法是 private 的,可是它提供了 getUnsafe() 這個靜態方法:

Unsafe unsafe = Unsafe.getUnsafe();Unsafe unsafe = Unsafe.getUnsafe();

你們能夠試一下,上面這行代碼編譯沒有問題,可是執行的時候會拋 java.lang.SecurityException 異常,由於它就不是給咱們的代碼用的。

可是若是你就是想獲取 Unsafe 的實例,能夠經過下面這個代碼獲取到:

Field f = Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
Unsafe unsafe = (Unsafe) f.get(null);
Field f = Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
Unsafe unsafe = (Unsafe) f.get(null);

Netty 中的 Unsafe 也是一樣的意思,它封裝了 Netty 中會使用到的 JDK 提供的 NIO 接口,好比將 channel 註冊到 selector 上,好比 bind 操做,好比 connect 操做等,這些操做都是稍微偏底層一些。Netty 一樣也是不但願咱們的業務代碼使用 Unsafe 的實例,它是提供給 Netty 中的源碼使用的。

不過,對於咱們源碼分析來講,咱們仍是會有不少時候須要分析 Unsafe 中的源碼的

關於 Unsafe,咱們後面用到了再說,這裏只要知道,它封裝了大部分須要訪問 JDK 的 NIO 接口的操做就行了。這裏咱們繼續將焦點放在實例化 pipeline 上:

protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}

這裏開始調用 DefaultChannelPipeline 的構造方法,並把當前 channel 的引用傳入:

protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);

tail = new TailContext(this);
head = new HeadContext(this);

head.next = tail;
tail.prev = head;
}
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);

tail = new TailContext(this);
head = new HeadContext(this);

head.next = tail;
tail.prev = head;
}

這裏實例化了 tail 和 head 這兩個 handler。tail 實現了 ChannelInboundHandler 接口,而 head 實現了 ChannelOutboundHandler 和 ChannelInboundHandler 兩個接口,而且最後兩行代碼將 tail 和 head 鏈接起來:

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

注意,在不一樣的版本中,源碼也略有差別,head 不必定是 in + out,你們知道這點就行了。

還有,從上面的 head 和 tail 咱們也能夠看到,其實 pipeline 中的每一個元素是 ChannelHandlerContext 的實例,而不是 ChannelHandler 的實例,context 包裝了一下 handler,可是,後面咱們都會用 handler 來描述一個 pipeline 上的節點,而不是使用 context,但願讀者知道這一點。

這裏只是構造了 pipeline,而且添加了兩個固定的 handler 到其中(head + tail),還不涉及到自定義的 handler 代碼執行。咱們回過頭來看下面這段代碼:

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

咱們說過 childHandler 中指定的 handler 不是給 NioServerSocketChannel 使用的,是給 NioSocketChannel 使用的,因此這裏咱們不看它。

這裏調用 handler(…) 方法指定了一個 LoggingHandler 的實例,而後咱們再進去下面的 bind(…) 方法中看看這個 LoggingHandler 實例是怎麼進入到咱們以前構造的 pipeline 內的。

順着 bind() 一直往前走,bind() -> doBind() -> initAndRegister():

final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 1. 構造 channel 實例,同時會構造 pipeline 實例,
// 如今 pipeline 中有 head 和 tail 兩個 handler 了
channel = channelFactory.newChannel();
// 2. 看這裏
init(channel);
} catch (Throwable t) {
......
}
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 1. 構造 channel 實例,同時會構造 pipeline 實例,
// 如今 pipeline 中有 head 和 tail 兩個 handler 了
channel = channelFactory.newChannel();
// 2. 看這裏
init(channel);
} catch (Throwable t) {
......
}

上面的兩行代碼,第一行實現了構造 channel 和 channel 內部的 pipeline,咱們來看第二行 init 代碼:

// ServerBootstrap:

@Override
void init(Channel channel) throws Exception {
......
// 拿到剛剛建立的 channel 內部的 pipeline 實例
ChannelPipeline p = channel.pipeline();
...
// 開始往 pipeline 中添加一個 handler,這個 handler 是 ChannelInitializer 的實例
p.addLast(new ChannelInitializer<Channel>() {

// 咱們之後會看到,下面這個 initChannel 方法什麼時候會被調用
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
// 這個方法返回咱們最開始指定的 LoggingHandler 實例
ChannelHandler handler = config.handler();
if (handler != null) {
// 添加 LoggingHandler
pipeline.addLast(handler);
}

// 先不用管這裏的 eventLoop
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// 添加一個 handler 到 pipeline 中:ServerBootstrapAcceptor
// 從名字能夠看到,這個 handler 的目的是用於接收客戶端請求
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
@Override
void init(Channel channel) throws Exception {
......
// 拿到剛剛建立的 channel 內部的 pipeline 實例
ChannelPipeline p = channel.pipeline();
...
// 開始往 pipeline 中添加一個 handler,這個 handler 是 ChannelInitializer 的實例
p.addLast(new ChannelInitializer<Channel>() {

// 咱們之後會看到,下面這個 initChannel 方法什麼時候會被調用
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
// 這個方法返回咱們最開始指定的 LoggingHandler 實例
ChannelHandler handler = config.handler();
if (handler != null) {
// 添加 LoggingHandler
pipeline.addLast(handler);
}

// 先不用管這裏的 eventLoop
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// 添加一個 handler 到 pipeline 中:ServerBootstrapAcceptor
// 從名字能夠看到,這個 handler 的目的是用於接收客戶端請求
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}

這裏涉及到 pipeline 中的輔助類 ChannelInitializer,咱們看到,它自己是一個 handler(Inbound 類型),可是它的做用和普通 handler 有點不同,它純碎是用來輔助將其餘的 handler 加入到 pipeline 中的。

你們能夠稍微看一下 ChannelInitializer 的 initChannel 方法,有個簡單的認識就好,此時的 pipeline 應該是這樣的:

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

ChannelInitializer 的 initChannel(channel) 方法被調用的時候,會往 pipeline 中添加咱們最開始指定的 LoggingHandler 和添加一個 ServerBootstrapAcceptor。可是咱們如今還不知道這個 initChannel 方法什麼時候會被調用。

上面咱們說的是做爲服務端的 NioServerSocketChannel 的 pipeline,NioSocketChannel 也是差很少的,咱們能夠看一下 Bootstrap 類的 init(channel) 方法:

void init(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
p.addLast(config.handler());
...
}
void init(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
p.addLast(config.handler());
...
}

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

它和服務端 ServerBootstrap 要添加 ServerBootstrapAcceptor 不同,它只須要將 EchoClient 類中的 ChannelInitializer 實例加進來就能夠了,它的 ChannelInitializer 中添加了兩個 handler,LoggingHandler 和 EchoClientHandler:

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

很顯然,咱們須要的是像 LoggingHandler 和 EchoClientHandler 這樣的 handler,可是,它們如今還不在 pipeline 中,那麼它們何時會真正進入到 pipeline 中呢?之後咱們再揭曉。

還有,爲何 Server 端咱們指定的是一個 handler 實例,而 Client 指定的是一個 ChannelInitializer 實例?其實它們是能夠隨意搭配使用的,你甚至能夠在 ChannelInitializer 實例中添加 ChannelInitializer 的實例。

很是抱歉,這裏又要斷了,下面要先介紹線程池了,你們要記住 pipeline 如今的樣子,head + channelInitializer + tail。

本節沒有介紹 handler 的向後傳播,就是一個 handler 處理完了之後,怎麼傳遞給下一個 handler 來處理?好比咱們熟悉的 JavaEE 中的 Filter 是採用在一個 Filter 實例中調用 chain.doFilter(request, response) 來傳遞給下一個 Filter 這種方式的。

咱們用下面這張圖結束本節。下圖展現了傳播的方法,但我實際上是更想讓你們看一下,哪些事件是 Inbound 類型的,哪些是 Outbound 類型的:

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

Outbound 類型的幾個事件你們應該比較好認,注意 bind 也是 Outbound 類型的。

Netty 中的線程池 EventLoopGroup

接下來,咱們來分析 Netty 中的線程池。Netty 中的線程池比較很差理解,由於它的類比較多,並且它們之間的關係錯綜複雜。看下圖,感覺下 NioEventLoop 類和 NioEventLoopGroup 類的繼承結構:

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

這張圖我按照繼承關係整理而來,你們仔細看一下就會發現,涉及到的類確實挺多的。本節來給你們理理清楚這部份內容。

首先,咱們說的 Netty 的線程池,指的就是 NioEventLoopGroup 的實例;線程池中的單個線程,指的是右邊 NioEventLoop 的實例。

咱們第一節介紹的 Echo 例子,客戶端和服務端的啓動代碼中,最開始咱們老是先實例化 NioEventLoopGroup:

// EchoClient 代碼最開始:
EventLoopGroup group = new NioEventLoopGroup();

// EchoServer 代碼最開始:
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
// EchoClient 代碼最開始:
EventLoopGroup group = new NioEventLoopGroup();

// EchoServer 代碼最開始:
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

下面,咱們就從 NioEventLoopGroup 的源碼開始進行分析。

咱們打開 NioEventLoopGroup 的源碼,能夠看到,NioEventLoopGroup 有多個構造方法用於參數設置,最簡單地,咱們採用無參構造函數,或僅僅設置線程數量就能夠了,其餘的參數採用默認值。

好比上面的代碼中,咱們只在實例化 bossGroup 的時候指定了參數,表明該線程池須要一個線程。

public NioEventLoopGroup() {
this(0);
}
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}

...

// 參數最全的構造方法
public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory,
final RejectedExecutionHandler rejectedExecutionHandler) {
// 調用父類的構造方法
super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory, rejectedExecutionHandler);
}
public NioEventLoopGroup() {
this(0);
}
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}

...

// 參數最全的構造方法
public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory,
final RejectedExecutionHandler rejectedExecutionHandler) {
// 調用父類的構造方法
super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory, rejectedExecutionHandler);
}

咱們來稍微看一下構造方法中的各個參數:

  • nThreads:這個最簡單,就是線程池中的線程數,也就是 NioEventLoop 的實例數量。

  • executor:咱們知道,咱們自己就是要構造一個線程池(Executor),爲何這裏傳一個 executor 實例呢?它其實不是給線程池用的,而是給 NioEventLoop 用的,之後再說。

  • chooserFactory:當咱們提交一個任務到線程池的時候,線程池須要選擇(choose)其中的一個線程來執行這個任務,這個就是用來實現選擇策略的。

  • selectorProvider:這個簡單,咱們須要經過它來實例化 JDK 的 Selector,能夠看到每一個線程池都持有一個 selectorProvider 實例。

  • selectStrategyFactory:這個涉及到的是線程池中線程的工做流程,在介紹 NioEventLoop 的時候會說。

  • rejectedExecutionHandler:這個也是線程池的好朋友了,用於處理線程池中沒有可用的線程來執行任務的狀況。在 Netty 中稍微有一點點不同,這個是給 NioEventLoop 實例用的,之後咱們再詳細介紹。

這裏介紹這些參數是但願你們有個印象而已,你們發現沒有,在構造 NioEventLoopGroup 實例時的好幾個參數,都是用來構造 NioEventLoop 用的。

下面,咱們從 NioEventLoopGroup 的無參構造方法開始,跟着源碼走:

public NioEventLoopGroup() {
this(0);
}
public NioEventLoopGroup() {
this(0);
}

而後一步步走下去,到這個構造方法:

public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) {

super(nThreads, threadFactory, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) {

super(nThreads, threadFactory, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}

你們本身要去跟一下源碼,這樣才知道中間設置了哪些默認值,下面這幾個參數都被設置了默認值:

  • selectorProvider = SelectorProvider.provider()

    這個沒什麼好說的,調用了 JDK 提供的方法

  • selectStrategyFactory = DefaultSelectStrategyFactory.INSTANCE

    這個涉及到的是線程在作 select 操做和執行任務過程當中的策略選擇問題,在介紹 NioEventLoop 的時候會用到。

  • rejectedExecutionHandler = RejectedExecutionHandlers.reject()

    你們進去看一下 reject() 方法,也就是說,Netty 選擇的默認拒絕策略是:拋出異常

跟着源碼走,咱們會來到父類 MultithreadEventLoopGroup 的構造方法中:

protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
}
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
}

這裏咱們發現,若是採用無參構造函數,那麼到這裏的時候,默認地 nThreads 會被設置爲 CPU 核心數 *2。你們能夠看下 DEFAULT_EVENT_LOOP_THREADS 的默認值,以及 static 代碼塊的設值邏輯。

咱們繼續往下走:

protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
}
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
}

到這一步的時候,new ThreadPerTaskExecutor(threadFactory) 會構造一個 executor。

咱們如今還不知道這個 executor 怎麼用。這裏咱們先看下它的源碼:

public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;

public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.threadFactory = threadFactory;
}

@Override
public void execute(Runnable command) {
// 爲每一個任務新建一個線程
threadFactory.newThread(command).start();
}
}
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;

public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.threadFactory = threadFactory;
}

@Override
public void execute(Runnable command) {
// 爲每一個任務新建一個線程
threadFactory.newThread(command).start();
}
}

Executor 做爲線程池的最頂層接口, 咱們知道,它只有一個 execute(runnable) 方法,從上面咱們能夠看到,實現類 ThreadPerTaskExecutor 的邏輯就是每來一個任務,新建一個線程。

咱們先記住這個,前面也說了,它是給 NioEventLoop 用的,不是給 NioEventLoopGroup 用的。

上一步設置完了 executor,咱們繼續往下看:

protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}

這一步設置了 chooserFactory,用來實現從線程池中選擇一個線程的選擇策略。

ChooserFactory 的邏輯比較簡單,咱們看下 DefaultEventExecutorChooserFactory 的實現:

@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}

這裏設置的策略也很簡單:

一、若是線程池的線程數量是 2^n,採用下面的方式會高效一些:

@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}

二、若是不是,用取模的方式:

@Override
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
@Override
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}

走了這麼久,咱們終於到了一個幹實事的構造方法中了:

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}

// executor 若是是 null,作一次和前面同樣的默認設置。
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}

// 這裏的 children 數組很是重要,它就是線程池中的線程數組,這麼說不太嚴謹,可是就大概這個意思
children = new EventExecutor[nThreads];

// 下面這個 for 循環將實例化 children 數組中的每個元素
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// 實例化!!!!!!
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
// 若是有一個 child 實例化失敗,那麼 success 就會爲 false,而後進入下面的失敗處理邏輯
if (!success) {
// 把已經成功實例化的「線程」 shutdown,shutdown 是異步操做
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}

// 等待這些線程成功 shutdown
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// 把中斷狀態設置回去,交給關心的線程來處理.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
// ================================================
// === 到這裏,就是表明上面的實例化全部線程已經成功結束 ===
// ================================================

// 經過以前設置的 chooserFactory 來實例化 Chooser,把線程池數組傳進去,
// 這就沒必要再說了吧,實現線程選擇策略
chooser = chooserFactory.newChooser(children);

// 設置一個 Listener 用來監聽該線程池的 termination 事件
// 下面的代碼邏輯是:給池中每個線程都設置這個 listener,當監聽到全部線程都 terminate 之後,這個線程池就算真正的 terminate 了。
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}

// 設置 readonlyChildren,它是隻讀集合,之後用到再說
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}

// executor 若是是 null,作一次和前面同樣的默認設置。
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}

// 這裏的 children 數組很是重要,它就是線程池中的線程數組,這麼說不太嚴謹,可是就大概這個意思
children = new EventExecutor[nThreads];

// 下面這個 for 循環將實例化 children 數組中的每個元素
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// 實例化!!!!!!
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
// 若是有一個 child 實例化失敗,那麼 success 就會爲 false,而後進入下面的失敗處理邏輯
if (!success) {
// 把已經成功實例化的「線程」 shutdown,shutdown 是異步操做
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}

// 等待這些線程成功 shutdown
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// 把中斷狀態設置回去,交給關心的線程來處理.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
// ================================================
// === 到這裏,就是表明上面的實例化全部線程已經成功結束 ===
// ================================================

// 經過以前設置的 chooserFactory 來實例化 Chooser,把線程池數組傳進去,
// 這就沒必要再說了吧,實現線程選擇策略
chooser = chooserFactory.newChooser(children);

// 設置一個 Listener 用來監聽該線程池的 termination 事件
// 下面的代碼邏輯是:給池中每個線程都設置這個 listener,當監聽到全部線程都 terminate 之後,這個線程池就算真正的 terminate 了。
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}

// 設置 readonlyChildren,它是隻讀集合,之後用到再說
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

上面的代碼很是簡單吧,沒有什麼須要特別說的,接下來,咱們來看看 newChild() 這個方法,這個方法很是重要,它將建立線程池中的線程。

我上面已經用過不少次"線程"這個詞了,它可不是 Thread 的意思,而是指池中的個體,後面咱們會看到每一個"線程"在何時會真正建立 Thread 實例。反正每一個 NioEventLoop 實例內部都會有一個本身的 Thread 實例,因此把這兩個概念混在一塊兒也無所謂吧。

newChild(…) 方法在 NioEventLoopGroup 中覆寫了,上面說的"線程"其實就是 NioEventLoop:

@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}

它調用了 NioEventLoop 的構造方法:

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
// 調用父類構造器
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
// 開啓 NIO 中最重要的組件:Selector
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
// 調用父類構造器
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
// 開啓 NIO 中最重要的組件:Selector
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}

咱們先粗略觀察一下,而後再往下看:

  • 在 Netty 中,NioEventLoopGroup 表明線程池,NioEventLoop 就是其中的線程。

  • 線程池 NioEventLoopGroup 是池中的線程 NioEventLoop 的 parent,從上面的代碼中的取名能夠看出。

  • 每一個 NioEventLoop 都有本身的 Selector,上面的代碼也反應了這一點,這和 Tomcat 中的 NIO 模型有點區別。

  • executor、selectStrategy 和 rejectedExecutionHandler 從 NioEventLoopGroup 中一路傳到了 NioEventLoop 中。

這個時候,咱們來看一下 NioEventLoop 類的屬性都有哪些,咱們先忽略它繼承自父類的屬性,單單看它本身的:

private Selector selector;
private Selector unwrappedSelector;
private SelectedSelectionKeySet selectedKeys;

private final SelectorProvider provider;

private final AtomicBoolean wakenUp = new AtomicBoolean();

private final SelectStrategy selectStrategy;

private volatile int ioRatio = 50;
private int cancelledKeys;
private boolean needsToSelectAgain;
private Selector selector;
private Selector unwrappedSelector;
private SelectedSelectionKeySet selectedKeys;

private final SelectorProvider provider;

private final AtomicBoolean wakenUp = new AtomicBoolean();

private final SelectStrategy selectStrategy;

private volatile int ioRatio = 50;
private int cancelledKeys;
private boolean needsToSelectAgain;

結合它的構造方法咱們來總結一下:

  • provider:它由 NioEventLoopGroup 傳進來,前面咱們說了一個線程池有一個 selectorProvider,用於建立 Selector 實例

  • selector:雖然咱們還沒看建立 selector 的代碼,但咱們已經知道,在 Netty 中 Selector 是跟着線程池中的線程走的。也就是說,並不是一個線程池一個 Selector 實例,而是線程池中每個線程都有一個 Selector 實例。

  • selectStrategy:select 操做的策略,這個不急。

  • ioRatio:這是 IO 任務的執行時間比例,由於每一個線程既有 IO 任務執行,也有非 IO 任務須要執行,因此該參數爲了保證有足夠時間是給 IO 的。這裏也不須要急着去理解什麼 IO 任務、什麼非 IO 任務。

而後咱們繼續走它的構造方法,咱們看到上面的構造方法調用了父類的構造器,它的父類是 SingleThreadEventLoop。

protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);

// 咱們能夠直接忽略這個東西,之後咱們也不會再介紹它
tailTasks = newTaskQueue(maxPendingTasks);
}
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);

// 咱們能夠直接忽略這個東西,之後咱們也不會再介紹它
tailTasks = newTaskQueue(maxPendingTasks);
}

SingleThreadEventLoop 這個名字很詭異有沒有?而後它的構造方法又調用了父類 SingleThreadEventExecutor 的構造方法:

protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
this.executor = ObjectUtil.checkNotNull(executor, "executor");
// taskQueue,這個東西很重要,提交給 NioEventLoop 的任務都會進入到這個 taskQueue 中等待被執行
// 這個 queue 的默認容量是 16
taskQueue = newTaskQueue(this.maxPendingTasks);
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
this.executor = ObjectUtil.checkNotNull(executor, "executor");
// taskQueue,這個東西很重要,提交給 NioEventLoop 的任務都會進入到這個 taskQueue 中等待被執行
// 這個 queue 的默認容量是 16
taskQueue = newTaskQueue(this.maxPendingTasks);
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}

到這裏就更加詭異了,NioEventLoop 的父類是 SingleThreadEventLoop,而 SingleThreadEventLoop 的父類是 SingleThreadEventExecutor,它的名字告訴咱們,它是一個 Executor,是一個線程池,並且是 Single Thread 單線程的。

也就是說,線程池 NioEventLoopGroup 中的每個線程 NioEventLoop 也能夠當作一個線程池來用,只不過它只有一個線程。這種設計雖然看上去很巧妙,不過有點反人類的樣子。

上面這個構造函數比較簡單:

  • 設置了 parent,也就是以前建立的線程池 NioEventLoopGroup 實例

  • executor:它是咱們以前實例化的 ThreadPerTaskExecutor,咱們說過,這個東西在線程池中沒有用,它是給 NioEventLoop 用的,立刻咱們就要看到它了。提早透露一下,它用來開啓 NioEventLoop 中的線程(Thread 實例)。

  • taskQueue:這算是該構造方法中新的東西,它是任務隊列。咱們前面說過,NioEventLoop 須要負責 IO 事件和非 IO 事件,一般它都在執行 selector 的 select 方法或者正在處理 selectedKeys,若是咱們要 submit 一個任務給它,任務就會被放到 taskQueue 中,等它來輪詢。該隊列是線程安全的 LinkedBlockingQueue,默認容量爲 16。

  • rejectedExecutionHandler:taskQueue 的默認容量是 16,因此,若是 submit 的任務堆積了到了 16,再往裏面提交任務會觸發 rejectedExecutionHandler 的執行策略。

    還記得默認策略嗎:拋出RejectedExecutionException 異常。

    在 NioEventLoopGroup 的默認構造中,它的實現是這樣的:

     private static final RejectedExecutionHandler REJECT = new RejectedExecutionHandler() {
    @Override
    public void rejected(Runnable task, SingleThreadEventExecutor executor) {
    throw new RejectedExecutionException();
    }
    };
    private static final RejectedExecutionHandler REJECT = new RejectedExecutionHandler() {
    @Override
    public void rejected(Runnable task, SingleThreadEventExecutor executor) {
    throw new RejectedExecutionException();
    }
    };

而後,咱們再回到 NioEventLoop 的構造方法:

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
// 咱們剛剛說完了這個
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
// 建立 selector 實例
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;

selectStrategy = strategy;
}
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
// 咱們剛剛說完了這個
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
// 建立 selector 實例
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;

selectStrategy = strategy;
}

能夠看到,最重要的方法其實就是 openSelector() 方法,它將建立 NIO 中最重要的一個組件 Selector。在這個方法中,Netty 也作了一些優化,這部分咱們就不去分析它了。

到這裏,咱們的線程池 NioEventLoopGroup 建立完成了,而且實例化了池中的全部 NioEventLoop 實例。

同時,你們應該已經看到,上面並無真正建立 NioEventLoop 中的線程(沒有建立 Thread 實例)。

提早透露一下,建立線程的時機在第一個任務提交過來的時候,那麼第一個任務是什麼呢?是咱們立刻要說的 channel 的 register 操做。

 

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

相關文章
相關標籤/搜索