有情懷,有乾貨,微信搜索【三太子敖丙】關注這個不同的程序員。java
本文 GitHub github.com/JavaFamily 已收錄,有一線大廠面試完整考點、資料以及個人系列文章。git
高能預警,本文是我一個月前就開始寫的,因此內容會很是長,固然也很是硬核,dubbo源碼系列結束以後我就想着寫一下netty系列的,可是netty的源碼概念又很是多,因此才寫到了如今。程序員
我相信90%的讀者都不會一口氣看完的,由於實在太長了,長到我如今頂配的mbp打字編輯框都是卡的,可是我但願你們往後想看netty或者在面試前須要瞭解的朋友回頭翻一下就夠了,那我寫這個文章的意義也就有了。github
也很少BB,直接開整。面試
阻塞和非阻塞是進程在訪問數據的時候,數據是否準備就緒的一種處理方式,當數據沒有準備的時候。編程
阻塞:每每須要等待緩衝區中的數據準備好事後才處理其餘的事情,不然一直等待在那裏。bootstrap
非阻塞:當咱們的進程訪問咱們的數據緩衝區的時候,若是數據沒有準備好則直接返回,不會等待。若是數據已經準備好,也直接返回。數組
阻塞 IO :promise
非阻塞 IO :服務器
同步和異步都是基於應用程序和操做系統處理 IO 事件所採用的方式。好比
**同步:**是應用程序要直接參與 IO 讀寫的操做。
**異步:**全部的 IO 讀寫交給操做系統去處理,應用程序只須要等待通知。
同步方式在處理 IO 事件的時候,必須阻塞在某個方法上面等待咱們的 IO 事件完成(阻塞 IO 事件或者經過輪詢 IO事件的方式),對於異步來講,全部的 IO 讀寫都交給了操做系統。這個時候,咱們能夠去作其餘的事情,並不須要去完成真正的 IO 操做,當操做完成 IO 後,會給咱們的應用程序一個通知。
因此異步相比較於同步帶來的直接好處就是在咱們處理IO數據的時候,異步的方式咱們能夠把這部分等待所消耗的資源用於處理其餘事務,提高咱們服務自身的性能。
同步 IO :
異步 IO :
BIO是一個同步並阻塞的IO模式,傳統的 java.io 包,它基於流模型實現,提供了咱們最熟知的一些 IO 功能,好比File抽象、輸入輸出流等。交互方式是同步、阻塞的方式,也就是說,在讀取輸入流或者寫入輸出流時,在讀、寫動做完成以前,線程會一直阻塞在那裏,它們之間的調用是可靠的線性順序。
NIO 是一種同步非阻塞的 I/O 模型,於 Java 1.4 中引入,對應 java.nio 包,提供了 Channel , Selector,Buffer 等抽象。NIO 中的 N 能夠理解爲 Non-blocking,不單純是 New。它支持面向緩衝的,基於通道的 I/O 操做方法。 NIO 提供了與傳統 BIO 模型中的 Socket
和 ServerSocket
相對應的 SocketChannel
和 ServerSocketChannel
兩種不一樣的套接字通道實現,兩種通道都支持阻塞和非阻塞兩種模式。對於高負載、高併發的(網絡)應用,應使用 NIO 的非阻塞模式來開發
IO模型 | BIO | NIO |
---|---|---|
通訊 | 面向流 | 面向緩衝 |
處理 | 阻塞 IO | 非阻塞 IO |
觸發 | 無 | 選擇器 |
Netty 是一個 NIO 客戶端服務器框架,可快速輕鬆地開發網絡應用程序,例如協議服務器和客戶端。它極大地簡化和簡化了網絡編程,例如 TCP 和 UDP 套接字服務器。
「快速簡便」並不意味着最終的應用程序將遭受可維護性或性能問題的困擾。Netty 通過精心設計,結合了許多協議(例如FTP,SMTP,HTTP 以及各類基於二進制和文本的舊式協議)的實施經驗。結果,Netty 成功地找到了一種無需妥協便可輕鬆實現開發,性能,穩定性和靈活性的方法。
Channel是 Java NIO 的一個基本構造。能夠看做是傳入或傳出數據的載體。所以,它能夠被打開或關閉,鏈接或者斷開鏈接。
EventLoop 定義了Netty的核心抽象,用來處理鏈接的生命週期中所發生的事件,在內部,將會爲每一個Channel分配一個EventLoop。
EventLoopGroup 是一個 EventLoop 池,包含不少的 EventLoop。
Netty 爲每一個 Channel 分配了一個 EventLoop,用於處理用戶鏈接請求、對用戶請求的處理等全部事件。EventLoop 自己只是一個線程驅動,在其生命週期內只會綁定一個線程,讓該線程處理一個 Channel 的全部 IO 事件。
一個 Channel 一旦與一個 EventLoop 相綁定,那麼在 Channel 的整個生命週期內是不能改變的。一個 EventLoop 能夠與多個 Channel 綁定。即 Channel 與 EventLoop 的關係是 n:1,而 EventLoop 與線程的關係是 1:1。
Bootstarp 和 ServerBootstrap 被稱爲引導類,指對應用程序進行配置,並使他運行起來的過程。Netty處理引導的方式是使你的應用程序和網絡層相隔離。
Bootstrap 是客戶端的引導類,Bootstrap 在調用 bind()(鏈接UDP)和 connect()(鏈接TCP)方法時,會新建立一個 Channel,僅建立一個單獨的、沒有父 Channel 的 Channel 來實現全部的網絡交換。
ServerBootstrap 是服務端的引導類,ServerBootstarp 在調用 bind() 方法時會建立一個 ServerChannel 來接受來自客戶端的鏈接,而且該 ServerChannel 管理了多個子 Channel 用於同客戶端之間的通訊。
ChannelHandler 是對 Channel 中數據的處理器,這些處理器能夠是系統自己定義好的編解碼器,也能夠是用戶自定義的。這些處理器會被統一添加到一個 ChannelPipeline 的對象中,而後按照添加的順序對 Channel 中的數據進行依次處理。
Netty 中全部的 I/O 操做都是異步的,即操做不會當即獲得返回結果,因此 Netty 中定義了一個 ChannelFuture 對象做爲這個異步操做的「代言人」,表示異步操做自己。若是想獲取到該異步操做的返回值,能夠經過該異步操做對象的addListener() 方法爲該異步操做添加監 NIO 網絡編程框架 Netty 聽器,爲其註冊回調:當結果出來後立刻調用執行。
Netty 的異步編程模型都是創建在 Future 與回調概念之上的。
源碼閱讀,最好能夠再 Debug 的狀況下進行,這樣更容易幫助理解,所以在分析 Netty 前的我準備一個客戶端和服務端的代碼。
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup parentGroup = new NioEventLoopGroup();
EventLoopGroup childGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(parentGroup, childGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new SomeSocketServerHandler());
}
});
ChannelFuture future = bootstrap.bind(8888).sync();
System.out.println("服務器已啓動。。。");
future.channel().closeFuture().sync();
} finally {
parentGroup.shutdownGracefully();
childGroup.shutdownGracefully();
}
}
}
複製代碼
public class DemoSocketServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("Client Address ====== " + ctx.channel().remoteAddress());
ctx.channel().writeAndFlush("from server:" + UUID.randomUUID());
ctx.fireChannelActive();
TimeUnit.MILLISECONDS.sleep(500);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
複製代碼
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new DemoSocketClientHandler());
}
});
ChannelFuture future = bootstrap.connect("localhost", 8888).sync();
future.channel().closeFuture().sync();
} finally {
if(eventLoopGroup != null) {
eventLoopGroup.shutdownGracefully();
}
}
}
}
複製代碼
public class DemoSocketClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg);
ctx.channel().writeAndFlush("from client: " + System.currentTimeMillis());
TimeUnit.MILLISECONDS.sleep(5000);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.channel().writeAndFlush("from client:begin talking");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
複製代碼
首先根據 Server 服務端代碼,分析 NioEventLoopGroup 的初始化過程。而在分析 NioEventLoopGroup 以前,有必要簡單的說一說 NioEventLoopGroup 與 NioEventLoop ,方便後續源碼的理解。
NioEventLoop 的繼承體系
從 NioEventLoop 的繼承體系中能夠看到,NioEventLoop 自己就是一個 Executor,而且仍是一個 單線程的 Executor。Executor 必然擁有一個 execute(Runnable command)
的實現方法,而 NioEventLoop 的 execute()
實現方法在其父類 SingleThreadEventExecutor 中,找到具體代碼:
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
startThread();
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
// The task queue does not support removal so the best thing we can do is to just move on and
// hope we will be able to pick-up the task before its completely terminated.
// In worst case we will log on termination.
}
if (reject) {
reject();
}
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
複製代碼
這裏不細說,可是貼出這段代碼主要爲了引出 startThread();
這句代碼,在跟這句代碼會發現,它最終調用了 NioEventLoop 的一個成員 Executor 執行了當前成員的 execute()
方法。對應的成員 io.netty.util.concurrent.SingleThreadEventExecutor#executor
而 executor 成員的初始化也是在當前代碼執行時建立的匿名 Executor ,也就是執行到即新建而且執行當前 匿名 executr()
方法。
總結:
execute
方法,除了自己的 execute()
方法對應的還有成員屬性 Executor 對應的 execute()
方法。備註: 由於這裏出現了四個 Executor,爲了區分,咱們給其新的名稱:
NioEventLoop 自己 Executor:NioEventLoop
NioEventLoop 的成員 Executor:子 Executor
NioEventLoopGroup 自己 Executor :NioEventLoopGroup
NioEventLoopGroup 的構造參數 Executor :總Executor
NioEventLoopGroup 的繼承體系
看到繼承體系能夠直接知道 NioEventLoopGroup 也是一個 Executor,而且是一個線程池的 Executor,因此他也有 execute()
方法。對應的實現再其父類之中:io.netty.util.concurrent.AbstractEventExecutorGroup#execute
而這裏還須要說到的一點是:在 NioEventLoopGroup 的構造中,再其父類 MultithreadEventExecutorGroup 的構造再次引入了一個新的 Executor,
之因此這裏提到這個 Executor,是由於這個 Executor 是對應的 execute()
就是在 NioEventLoop 中的成員 Executor 的 execute()
執行時調用的。也就是下面對應的代碼調用。io.netty.util.internal.ThreadExecutorMap#apply(java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutor)
到這若是不明白,不要緊,由於只是爲了引入 NioEventLoopGroup 和 NioEventLoop 的對應的兩個 Executor,和兩個 Executor 對應的兩個 execute()
方法。這個後面還會有詳細分析。
總結:
execute()
方法。上面說了基本的瞭解內容,下面具體分析,從 NioEventLoopGroup 的初始化進入源碼分析。
入口咱們直接找 NioEventLoopGroup 的無參構造。
public NioEventLoopGroup() {
this(0);
}
複製代碼
public NioEventLoopGroup(int nThreads) {
// 第二個參數是這個group所包含的executor
this(nThreads, (Executor) null);
}
複製代碼
public NioEventLoopGroup(int nThreads, Executor executor) {
// 第三個參數是provider,其用於提供selector及selectable的channel,
// 這個provider是當前JVM中惟一的一個單例的provider
this(nThreads, executor, SelectorProvider.provider());
}
複製代碼
public NioEventLoopGroup( int nThreads, Executor executor, final SelectorProvider selectorProvider) {
// 第四個參數是一個選擇策略工廠實例
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
複製代碼
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
複製代碼
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
複製代碼
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
// 第三個參數是選擇器工廠實例
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
複製代碼
跟到此,能夠發現無參構造的基本參數被初始化, nThreads :DEFAULT_EVENT_LOOP_THREADS//默認當前CPU邏輯核心數的兩倍
,selectorProvide:SelectorProvider.provider()//當前JVM中惟一的一個單例的provider
,SelectStrategyFactory:DefaultSelectStrategyFactory.INSTANCE//默認選擇策略工廠實例
,chooserFactory:DefaultEventExecutorChooserFactory.INSTANCE//選擇器工廠實例
。到這裏只是基本的初始化參數,重點方法爲MultithreadEventExecutorGroup
的構造方法。下面重點分析:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (executor == null) {
// 這個executor是group所包含的executor,其未來會爲其所包含的每一個eventLoop建立一個線程
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// 建立eventLoop
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
// 在建立這些eventLoop過程當中,只要有一個建立失敗,則關閉以前全部已經建立好的eventLoop
if (!success) {
// 關閉以前全部已經建立好的eventLoop
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
// 終止全部eventLoop上所執行的任務
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
Thread.currentThread().interrupt();
break;
}
}
}
}
}
// 建立一個選擇器
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
複製代碼
根據無參構造直接往下跟,能夠看到核心部分在最後一個父類的構造裏。也就是 io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutorChooserFactory, java.lang.Object...)
。
再這裏完成整個 NioEventLoopGroup 的實例初始化,這裏分析下,而後再畫個圖回顧下。
初始化構造參數中的 Executor 參數,當其爲空時,將其初始化
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
複製代碼
首先 newDefaultThreadFactory())
建立默認的線程工廠,有興趣能夠跟進去看看。而後再建立ThreadPerTaskExecutor
線程 Executor 對象。(PS:這裏建立的 Executor 就是 NioEventLoopGroup 內的 Executor 對象,並非當前 NioEventLoopGroup 自身,能夠稱其爲 總 Executor)。
而後能夠看到這裏建立了一個 children 數組,根據須要建立的線程數建立對應數量的數組。
children = new EventExecutor[nThreads];
複製代碼
由於每一個 NioEventLoopGroup 都是 NioEventLoop 的集合,因此這裏的 children 數組就是當前 NioEventLoopGroup 的 NioEventLoop。因此 NioEventLoop 的建立的實在 NioEventLoopGroup 初始化的時候。下面看 NioEventLoop 的初始化:
// 逐個建立nioEventLoop實例
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// 建立eventLoop
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
// 在建立這些eventLoop過程當中,只要有一個建立失敗,則關閉以前全部已經建立好的eventLoop
if (!success) {
// 閉以前全部已經建立好的eventLoop
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
// 終止全部eventLoop上所執行的任務
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
複製代碼
先總體看這段 NioEventLoop 的建立代碼,能夠看到整個過程當中存在一個成功標誌,catch 每一個 NioEventLoop 建立完成過程,若是發生異常則將全部已經建立的 NioEventLoop 關閉。重點的代碼也就在 NioEventLoop 的建立了。因此咱們繼續跟:children[i] = newChild(executor, args);
往下走,直接找到 io.netty.channel.nio.NioEventLoopGroup#newChild
,由於當前是 NioEventLoopGroup 的建立,因此知道找到子類的 newChild
實現。
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
複製代碼
又將以前合併的 args 參數強轉回來,繼續跟進 NioEventLoop 構造:
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
// 建立一個selector的二元組
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
複製代碼
這裏咱們先總體看下,將以前的默認參數初始化到 NioEventLoop 屬性中。其中有兩處:openSelector()
和 super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler)
。這裏先看父類構造:
往下跟,直接就是 SingleThreadEventLoop -> SingleThreadEventExecutor 的初始化,這些也能夠在 NioEventLoop 的繼承體系能夠看到:
// io.netty.channel.SingleThreadEventLoop#SingleThreadEventLoop
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
// 建立一個收尾隊列
tailTasks = newTaskQueue(maxPendingTasks);
}
// io.netty.util.concurrent.SingleThreadEventExecutor#SingleThreadEventExecutor
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
// 這是當前NioEventLoop所包含的executor
this.executor = ThreadExecutorMap.apply(executor, this);
// 建立一個任務隊列
taskQueue = newTaskQueue(this.maxPendingTasks);
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
複製代碼
這裏首先建立的是 SingleThreadEventExecutor ,這裏重點須要關注的代碼是:
this.executor = ThreadExecutorMap.apply(executor, this);
複製代碼
這裏this
是 NioEventLoop ,因此this.executor
就是前面說的 NioEventLoop 裏的 Executor,這裏咱們先稱爲 子 Executor(子:對應的就是 NioEventLoop ,前面說的 總:對應的是 NioEventLoopGroup )。
而這裏 子 Executor 的初始化是由一個 executor
參數的,這個就是前面 NioEventLoopGroup 構造方法一直帶入的 總 Executor。那咱們繼續往下跟,看看這個子 Executor 是如何完成的初始化的。
public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {
ObjectUtil.checkNotNull(executor, "executor");
ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
// 這裏建立的executor是子executor
return new Executor() {
// 這個execute()是子executor的execute()
@Override
public void execute(final Runnable command) {
// 這裏調用了NioEventLoopGroup所包含的executor的execute()
// 即調用了「總的executor」的execute()
executor.execute(apply(command, eventExecutor));
}
};
}
複製代碼
這段代碼細看就會明白,這裏建立的 子 Executor的建立也就是一個線程的建立,可是重點卻在這個線程 Executor 的 execute()
方法實現,只作了一件事情:就是調用 傳入的 總 Executor 的 execute()
方法。因此這裏 子 Executor 作的事情就是調用 總 Executor 的 execute()
。不要以爲這裏繞,由於這還只是初始化,後面這裏執行會更繞。[手動捂臉哭]
其實這裏的 apply(command, eventExecutor)
,這裏再執行 總 Executor 的 execute()
時仍是會記錄當前正在執行的線程,而且再執行完成時將當前記錄值刪除。
public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) {
ObjectUtil.checkNotNull(command, "command");
ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
return new Runnable() {
@Override
public void run() {
setCurrentEventExecutor(eventExecutor);
try {
command.run();
} finally {
setCurrentEventExecutor(null);
}
}
};
}
複製代碼
這裏再 NioEventLoop 的屬性 Executor 建立完成時,又去建立了一個普通任務隊列taskQueue = newTaskQueue(this.maxPendingTasks);
而且還建立了一個收尾任務隊列tailTasks = newTaskQueue(maxPendingTasks);
。這幾個隊列後面會說到。這裏繼續跟 NioEventLoop 主流程初始化。
到這咱們再回去看看 openSelector()
,這裏咱們要先知道 SelectorTuple :
private static final class SelectorTuple {
final Selector unwrappedSelector; // NIO原生selector
final Selector selector; // 優化過的selector
SelectorTuple(Selector unwrappedSelector) {
this.unwrappedSelector = unwrappedSelector;
this.selector = unwrappedSelector;
}
SelectorTuple(Selector unwrappedSelector, Selector selector) {
this.unwrappedSelector = unwrappedSelector;
this.selector = selector;
}
}
複製代碼
SelectorTuple 只是一個包含兩個 Selector 的內部類,用於封裝優化先後的 Selector。而 openSelector()
方法就是爲了返回 Selector 而且根據配置判斷是否須要優化當前 Selector 。下面看具體代碼:
而具體的優化過程有興趣的能夠本身去看看,這裏只要知道,如果禁用了優化則 SelectorTuple 的優化後的 Selector 和爲優化的 Selector 均爲 Nio 原生的 Selector。
而這io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutorChooserFactory, java.lang.Object...)
後面還有在 NioEventLoop 數組建立完成後,還有選擇器建立和關閉監聽器綁定等,感興趣能夠本身看看,這裏再也不介紹。
到這一個 NioEventLoop 的建立過程的代碼也所有看完了。我想若是隻看這個確定仍是有點懵,源碼這個東西須要本身跟進去去看,debug 一點點的跟,跟着運行的代碼去想爲什麼這麼實現,不過這裏我也畫個圖,讓你們更直觀的瞭解到 NioEventLoopGroup 的建立流程以及主要操做。
我想你們結合這個圖,再結合上面的分析過程,最好能夠本身找到源碼,跟一遍,應該能夠理解 NioEvnetLoopGroup 的建立。
繼承體系:
入口代碼:
//2.建立服務端啓動引導/輔助類:ServerBootstrap
ServerBootstrap b = new ServerBootstrap();
//3.給引導類配置兩大線程組,肯定了線程模型
b.group(bossGroup, workerGroup)
// (非必備)打印日誌
.handler(new LoggingHandler(LogLevel.INFO))
// 4.指定 IO 模型
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
//5.能夠自定義客戶端消息的業務處理邏輯
p.addLast(new HelloServerHandler());
}
});
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new SomeSocketClientHandler());
}
});
複製代碼
ServerBootstrap與 Bootstrap 都是啓動配置類,惟一不一樣的是,ServerBootstrap是服務端的啓動配置類,Bootstrap 則是客戶端的啓動配置類,主要用於綁定咱們建立的 EventLoopGroup,指定 Channel 的類型以及綁定 Channel 處理器等操做,主要作的都是給 ServerBootstrap與 Bootstrap 的屬性賦值操做,因此稱其爲配置類。能夠進入 group()
方法裏看一眼:
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (childGroup == null) {
throw new NullPointerException("childGroup");
}
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = childGroup;
return this;
}
複製代碼
其餘的方法也是同樣,感興趣能夠本身進去看看。這裏只是初始化,都是爲了後面的操做作準備。
這裏咱們從這裏進入:
b.bind(port).sync();
複製代碼
直接從 bind()
方法跟進去:
// io.netty.bootstrap.AbstractBootstrap#bind(int)
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
// 繼續跟進
public ChannelFuture bind(SocketAddress localAddress) {
// 驗證group與channelFactory是否爲null
validate();
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
// 這裏是一處重點邏輯
return doBind(localAddress);
}
複製代碼
這裏顯示校驗了 Bootstrap 的 group 與 channelFactory 是否綁定成功。而後繼續跟進 doBind()
方法:
private ChannelFuture doBind(final SocketAddress localAddress) {
// 建立、初始化channel,並將其註冊到selector,返回一個異步結果
final ChannelFuture regFuture = initAndRegister();
// 從異步結果中獲取channel
final Channel channel = regFuture.channel();
// 若異步操做執行過程當中出現了異常,則直接返回異步對象(直接結束)
if (regFuture.cause() != null) {
return regFuture;
}
// 處理異步操做完成的狀況(多是正常結束,或發生異常,或任務取消,這些狀況都屬於有結果的狀況)
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
// 綁定指定的端口
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else { // 處理異步操做還沒有有結果的狀況
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
// 爲異步操做添加監聽
regFuture.addListener(new ChannelFutureListener() {
// 若異步操做具備告終果(即完成),則觸發該方法的執行
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) { // 異步操做執行過程當中出現了問題
promise.setFailure(cause);
} else { // 異步操做正常結果
promise.registered();
// 綁定指定的端口
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
複製代碼
首先再這裏,咱們先把這個方法總體的邏輯搞清楚,而後再再去研究他的每一步具體的操做,畫個圖,先理解這個方法作了什麼:
能夠在圖中結合代碼,找到整個 dobind()
的大局處理思路,而後呢,到這裏咱們還有不少重點細節須要繼續跟進,也就是圖中標記的 Tag 一、Tag 2。爲了方便後面跟進去代碼以後方便回來,這裏以此標記,而後下面在具體分析 Tag 標記的源碼:
補充 Tag 0 :
ChannelPromise 與 ChannelFuture 瞭解。
Tag 1 :
異步建立、初始化channel,並將其註冊到selector
final ChannelFuture regFuture = initAndRegister();
Tag 2 :
綁定指定的端口號:
doBind0(regFuture, channel, localAddress, promise);
ChannelPromise 是一個特殊的 ChannelFuture,是一個可修改的 ChannelFuture。內部提供了修改當前 Future 狀態的方法。在 ChannelFuture 的基礎上實現了設置最終狀態的修改方法。
而 ChannelFuture 只能夠查詢當前異步操做的結果,不能夠修改當前異步結果的 Future 。這裏須要知道的就是 ChannelPromise 能夠修改當前異步結果的狀態,而且在修改狀態是會觸發監聽器。在 doBind
方法中主要用於在處理異步執行一直未結束的的操做,將異步結果存在異常的時,將異常賦值給 ChannelPromise 並返回。
先找到代碼:
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 建立channel
channel = channelFactory.newChannel();
// 初始化channel
init(channel);
} catch (Throwable t) {
if (channel != null) {
channel.unsafe().closeForcibly();
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
// 將channel註冊到selector
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
複製代碼
嗯?!代碼意一看,咋就這麼點,也就作了三件事,但是這三件事作的每個都不是一句代碼的能夠完成的。這裏咱們一個一個分析,除了這三件事情,其餘的也就是異常後的處理邏輯,因此主流程就是下面的三句代碼,也爲了跟進繼續打上標記吧:
Tag 1.1 建立channel channel = channelFactory.newChannel();
Tag 1.2 初始化channel init(channel);
Tag 1.3 將channel註冊到selector ChannelFuture regFuture = config().group().register(channel);
針對這三處,仍是要一處一處分析。
找到對應的代碼:io.netty.channel.ReflectiveChannelFactory#newChannel
@Override
public T newChannel() {
try {
// 調用無參構造器建立channel
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
}
}
複製代碼
這裏爲何直接找到 ReflectiveChannelFactory ,須要提一下,在分析 ServerBootstrap與 Bootstrap 啓動配置類的時候,設置 channel 的方法,跟進去能夠找到針對屬性 channelFactory 的賦值代碼:
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
複製代碼
能夠看到這裏 new 的就是 ReflectiveChannelFactory 工廠類,而後再看 ReflectiveChannelFactory 的構造:
public ReflectiveChannelFactory(Class<? extends T> clazz) {
ObjectUtil.checkNotNull(clazz, "clazz");
try {
// 將NioServerSocketChannel的無參構造器初始化到constructor
this.constructor = clazz.getConstructor();
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
" does not have a public non-arg constructor", e);
}
}
複製代碼
看到的是 ReflectiveChannelFactory 在建立時初始化了 constructor 屬性,將傳入的 channel 類 clazz 中獲取構造賦值給了 ReflectiveChannelFactory 反射工廠的 constructor 屬性。
而咱們再 Server 端傳入的 channel 類爲NioServerSocketChannel.class
,因此上面看的 constructor.newInstance();
對應的也就是 NioServerSocketChannel 的無參構造。這樣咱們就繼續跟進 NioServerSocketChannel :
// NIO中的provider,其用於建立selector與channel。而且是單例的
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
public NioServerSocketChannel() {
// DEFAULT_SELECTOR_PROVIDER 靜態變量
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
複製代碼
繼續跟進 newSocket()
:
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
// 建立NIO原生的channel => ServerSocketChannel
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
複製代碼
就是返回了一個 Java NIO 原生的 Channel,最後將 NIO 原生的Channel 包裝成 NioServerSocketChannel,繼續跟進 this(newSocket(DEFAULT_SELECTOR_PROVIDER))
找到有參構造具體代碼:
public NioServerSocketChannel(ServerSocketChannel channel) {
// 參數1:父channel
// 參數2:NIO原生channel
// 參數3:指定當前channel所關注的事件爲 接受鏈接
super(null, channel, SelectionKey.OP_ACCEPT);
// 用於對channel進行配置的屬性集合
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
複製代碼
這裏主要作了兩件事情,1. 調用父類構造,2. 對 channel 進行配置屬性集合。
這裏先說下 new NioServerSocketChannelConfig(),這部操做就是給當前 Channel 的 config 進行賦值,用來保存當前 Channel 的屬性配置的集合。好了,這個說了咱們繼續跟主線:super(null, channel, SelectionKey.OP_ACCEPT)
// io.netty.channel.nio.AbstractNioMessageChannel#AbstractNioMessageChannel
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent, ch, readInterestOp);
}
// io.netty.channel.nio.AbstractNioChannel#AbstractNioChannel
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
// 這裏的this.ch爲NIO原生channel
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
// NIO,非阻塞
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
複製代碼
直接找到 AbstractNioChannel 父類構造,這也第一步也是調用父類構造 super(parent);
先記着,先看除了調用父類構造還作了什麼事情:
- 調用父類構造 super(parent);
- 將前面建立的原生 Channel 複製給屬性保存 this.ch = ch;
- 當前 channel 的關注事件屬性賦值 this.readInterestOp = readInterestOp; // SelectionKey.OP_ACCEPT 接受事件
- 將 NIO 原生 Channel 設置爲非阻塞 ch.configureBlocking(false);
在 AbstractNioChannel 構造中就作了這麼四件事情,主要須要說的仍是其調用父類構造又作了什麼事情,找到代碼:
// io.netty.channel.AbstractChannel#AbstractChannel(io.netty.channel.Channel)
protected AbstractChannel(Channel parent) {
this.parent = parent;
// 爲channel生成id,由五部分構成
id = newId();
// 生成一個底層操做對象unsafe
unsafe = newUnsafe();
// 建立與這個channel相綁定的channelPipeline
pipeline = newChannelPipeline();
}
複製代碼
在 AbstractChannel 構造中主要作了三件事:
- 爲當前 Channel 生成 id
newId()
,感興趣能夠跟進去看看。- 生成一個底層操做對象 unsafe,用於 I/O 線程調用傳輸時使用,用戶代碼沒法調用。
newUnsafe()
- 建立與這個channel相綁定的channelPipeline,這也是一個重點操做,不過在這裏先不展開細說,後面會單獨細跟 channelPipeline 的代碼。
因此到此 **Tag 1 : initAndRegister() ** 中的 **Tag 1.1 newChannel() ** 建立 Channel 纔算跟完。針對 Tag 1.1 newChannel() 咱們也畫圖簡圖整理下思路:
根據圖,在結合上面代碼的分析,最好本身再能夠跟一遍代碼,我想這一塊的理解仍是沒什麼問題的。到這也只是建立了 Channel。Tag 1.1 的 Channel 建立結束,接着跟進 Tag 1.2 init(channel).
這裏咱們是從 ServerBootstrap 中的doBind 進入的,因此這裏直接找到 io.netty.bootstrap.ServerBootstrap#init
void init(Channel channel) throws Exception {
// 獲取serverBootstrap中的options屬性
final Map<ChannelOption<?>, Object> options = options0();
// 將options屬性設置到channel
synchronized (options) {
setChannelOptions(channel, options, logger);
}
// 獲取serverBootstrap中的attrs屬性
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
// 遍歷attrs屬性
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
// 將當前遍歷的attr初始化到channel
channel.attr(key).set(e.getValue());
}
}
// 獲取channel的pipeline
ChannelPipeline p = channel.pipeline();
// 將serverBootstrap中全部以child開頭的屬性寫入到局部變量,
// 而後將它們初始化到childChannel中
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// 將ServerBootstrapAcceptor處理器添加到pipeline
// ServerBootstrapAcceptor處理器用於接收ServerBootstrap中的屬性值,
// 咱們一般稱其爲鏈接處理器
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
複製代碼
這裏作的事情仍是不少的,基本操做我在上面註釋上也標註出來,還有一些須要繼續跟下去的主要操做,仍是先標記 Tag 而後繼續跟下去。這裏說一下這裏的 options 與 attrs 屬性的賦值,其實就是講咱們 ServerBootstrap 與 Bootstrap 在調用 doBind()
以前經過 option()
與 attr()
設置的參數值,其中 options 屬性設置到了 Channel 的 config 屬性中,attrs 是直接被設置在了 Channel 上的。
在設置完 options 屬性與 attrs 屬性時,接着獲取了當前 channel 的 pipeline,接下來仍是獲取咱們在 doBind()
以前設置的屬性值,以 child 開頭的方法 childOption()
與 childAttr()
設置的屬性值。
這裏使用局部變量記錄了全部 Child 相關的值 currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs
主要用於初始化 childChannel 的屬性,new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs))
主要是建立 鏈接處理器。
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// 將ServerBootstrapAcceptor處理器添加到pipeline
// ServerBootstrapAcceptor處理器用於接收ServerBootstrap中的屬性值,
// 咱們一般稱其爲鏈接處理器
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
複製代碼
首先這裏想作的事情是:將當前 channel 的 pipeline 中綁定一個初始化處理器 ChannelInitializer ,由於是抽象類,因此須要匿名實現 initChannel方法。 而這些主要的操做是處理 childGroup 裏面的 channel 的初始化操做。這裏我只想主要講一下這個鏈接處理器 ServerBootstrapAcceptor 主要作了什麼,其餘的具體會在後面的 handler 和 pipeline 的時候細說。
**補充:**這裏由於 ServerBootstrap 服務端是對用的有兩個 EventLoopGroup,在服務端,parentGroup 是用於接收客戶端的鏈接,在 parentGroup 接收到鏈接以後是將只是將當前轉給了 childGroup去處理後續操做,而 childGroup 是用來專門處理鏈接後的操做的,不關心 channel 的鏈接任務。這個其實就是 Netty-Server 的 Reactor 線程池模型的處理邏輯。
這裏主要往下說一下這個鏈接處理器: ServerBootstrapAcceptor 。
ServerBootstrapAcceptor(
final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,
Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
this.childGroup = childGroup;
this.childHandler = childHandler;
this.childOptions = childOptions;
this.childAttrs = childAttrs;
// See https://github.com/netty/netty/issues/1328
enableAutoReadTask = new Runnable() {
@Override
public void run() {
channel.config().setAutoRead(true);
}
};
}
複製代碼
ServerBootstrapAcceptor 構造只是將 ServerBootstrap 中配置的 Child 屬性設置保存下來。而這裏一直說這是鏈接處理器,是由於當客戶端鏈接發送到服務端時,這個處理器會接收客戶端的鏈接並處理。主要是處理方法是 channelRead 中的實現:
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// msg爲客戶端發送來的數據,其爲NioSocketChannel,即子channel,childChannel
final Channel child = (Channel) msg;
// 未來自於ServerBootstrap的child開頭屬性初始化到childChannel中(childHandler、childOptions、childAttrs)
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
// 將childChannel註冊到selector 須要注意的是,這裏的selector與父channel所註冊的selector不是同一個
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
複製代碼
這裏主要就作了兩件事情:
- 初始化 childChannel
- 將成功從 client 鏈接過來的 channel 註冊到 selector 上。
這裏一直說子channel,就是由於這裏註冊的是兩個 EventLoopGroup,在 Server 端的處理上 netty 線程模型採用「服務端監聽線程」和「IO線程」分離的方式。因此這裏 channelRead
方法就是在 client 端請求鏈接到 server 端時,用於將當前鏈接的 IO 線程綁定到 childChannel 同時註冊到 ChildGroup 中的 Selector 中。線程,模型能夠參考下面的圖:
好了,到這裏 **Tag 1.2 initChannel ** 代碼也分析完了,有些關於 pipeline 、handler、selector 的部分沒有細說由於後面會單獨說,在這裏沒有直接展開。
這裏也畫個圖:到時候將這些圖在整合到一塊兒,如今是的分析過程就像是化整爲零,最後在整合到一塊兒化零爲整。
這裏除了 init(channel) 方法以外,還主要說了下 ServerBootstrapAcceptor 鏈接處理器。其實主要是 netty-server 的線程模型與代碼的結合理解。
我是敖丙,你知道的越多,你不知道的越多,感謝各位人才的:點贊、收藏和評論,咱們下期見!
文章持續更新,能夠微信搜一搜「 三太子敖丙 」第一時間閱讀,回覆【資料】有我準備的一線大廠面試資料和簡歷模板,本文 GitHub github.com/JavaFamily 已經收錄,有大廠面試完整考點,歡迎Star。