想要閱讀Netty源碼的同窗,建議從GitHub上把源碼拉下來,方便寫註釋、Debug調試哦~點我去下載!java
先來看一個簡單的Echo服務端程序,監聽本地的9999端口,有客戶端接入時控制檯輸出一句話,接收到客戶端的數據後直接原樣寫回。git
public class EchoServer {
// 綁定的端口
private final int port;
public EchoServer(int port) {
this.port = port;
}
public static void main(String[] args) {
// 啓動Echo服務
new EchoServer(9999).start();
}
public void start() {
/* bossGroup負責客戶端的接入 workerGroup負責IO數據的讀寫 */
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup worker = new NioEventLoopGroup();
new ServerBootstrap()
.group(boss, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
System.out.println("有新的客戶端鏈接...");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
/* 原樣寫回給客戶端,由於OutBoundHandler還要使用,所以不能釋放msg。 底層數據寫完後會自動釋放。 */
ctx.writeAndFlush(msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 出現異常了
cause.printStackTrace();
ctx.channel().close();
}
});
}
})
.bind(port);
}
}
複製代碼
代碼仍是很簡單的,接下來會一步步分析,僅僅幾行代碼,Netty到底作了什麼!github
Netty程序要想成功運行,須要EventLoopGroup進行驅動,ServerBootstrap.bind()會將ServerSocketChannel綁定到本地端口,這樣服務端就能夠接收客戶端的鏈接了,可是在這以前,必須確保設置了EventLoopGroup,ServerBootstrap調用bind()前會進行檢查,方法是validate()
,源碼以下:編程
/** * 驗證必要的參數 */
public B validate() {
if (group == null) {//EventLoopGroup必須設置,依賴它驅動程序
throw new IllegalStateException("group not set");
}
if (channelFactory == null) {//依賴channelFactory建立ServerSocketChannel對象
throw new IllegalStateException("channel or channelFactory not set");
}
return self();
}
複製代碼
先來看看類的繼承關係: 數組
NioEventLoopGroup實現了ScheduledExecutorService,說明它不只能夠執行異步任務,還能夠執行定時任務。實現Iterable接口,是由於EventLoopGroup管理着一組EventLoop,須要對其進行迭代遍歷。MultithreadEventExecutorGroup表明它是一個多線程的事件執行器,而它管理的EventLoop就是個單線程的事件執行器。promise
先來看構造函數,它的構造函數很是多,咱們直接看參數最全的一個:安全
/** * @param nThreads 線程數量,就是NioEventLoop的數量,默認CPU核心數*2 * @param executor NioEventLoop.run()的執行者,默認爲ThreadPerTaskExecutor,NioEventLoop將利用它來啓動一個FastThreadLocalThread並執行 * @param chooserFactory 選擇器工廠,默認DefaultEventExecutorChooserFactory,輪詢選擇NioEventLoop * @param selectorProvider 多路複用器提供者,DefaultSelectorProvider.create() * @param selectStrategyFactory select策略工廠,指示EventLoop應該要作什麼事情 * @param rejectedExecutionHandler 拒絕策略 * @param taskQueueFactory 任務隊列工廠,默認PlatformDependent.newMpscQueue(),Netty實現的高性能無鎖隊列 */
public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory, final RejectedExecutionHandler rejectedExecutionHandler, final EventLoopTaskQueueFactory taskQueueFactory) {
super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory,
rejectedExecutionHandler, taskQueueFactory);
}
複製代碼
NioEventLoopGroup會把參數傳給父類構造器MultithreadEventLoopGroup,這裏會對nThreads進行初始化設置:markdown
/** * 參數太多,之後也可能會改變,後面的參數直接用Object...接收了 */
protected MultithreadEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
// 若是nThreads=0,則默認爲CPU核心數*2
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, chooserFactory, args);
}
複製代碼
再次調用父類構造器,核心初始化流程在MultithreadEventExecutorGroup中:多線程
/* 建立一個多線程的事件執行器組 */
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
// 確保線程數大於0
checkPositive(nThreads, "nThreads");
/* 若是沒提供Executor,則建立默認的ThreadPerTaskExecutor。 ThreadPerTaskExecutor依賴於一個ThreadFactory,靠它建立線程來執行任務。 默認的ThreadFactory會使用FastThreadLocalThread來提高FastThreadLocal的性能。 */
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
// 建立子EventExecutor
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
// EventExecutor建立失敗,停機釋放資源
for (int j = 0; j < i; j++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
/* 建立選擇器:簡單輪詢 PowerOfTwoEventExecutorChooser:2的冪次方,位運算 GenericEventExecutorChooser:不然,取餘 有事件/任務要執行時,取出一個EventExecutor */
chooser = chooserFactory.newChooser(children);
// 全部children中止時收到一個通知,優雅停機時用到
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e : children) {
e.terminationFuture().addListener(terminationListener);
}
// 返回一個只讀的children,iterator()迭代時使用
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
複製代碼
EventLoopGroup會管理EventLoop,EventLoop執行任務須要依賴Executor,Executor執行任務須要依賴ThreadFactory建立新的線程,咱們看下Netty默認的Executor實現。併發
默認的ThreadFactory,會建立FastThreadLocalThread線程,來優化FastThreadLocal的性能,關於FastThreadLocal後面會有專門的文章介紹。
// 建立一個默認的線程工廠
protected ThreadFactory newDefaultThreadFactory() {
return new DefaultThreadFactory(getClass());
}
/* 默認的線程工廠 */
public class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolId = new AtomicInteger();
// 生成線程名稱時用到:prefix+nextId自增
private final AtomicInteger nextId = new AtomicInteger();
private final String prefix;//前綴
private final boolean daemon;//是否守護線程,默認false
private final int priority;//優先級 默認5
protected final ThreadGroup threadGroup;//所屬線程組
// 省略部分代碼......
@Override
public Thread newThread(Runnable r) {
// 建立一個FastThreadLocalThread線程,優化FastThreadLocal的性能
Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
try {
if (t.isDaemon() != daemon) {
t.setDaemon(daemon);
}
if (t.getPriority() != priority) {
t.setPriority(priority);
}
} catch (Exception ignored) {
// Doesn't matter even if failed to set.
}
return t;
}
protected Thread newThread(Runnable r, String name) {
return new FastThreadLocalThread(threadGroup, r, name);
}
}
複製代碼
有了ThreadFactory,Executor的實現就很簡單了,當要執行任務的時候,建立一個新線程去跑就行了。EventLoop會在第一次execute()時調用該方法,整個生命週期只會調用一次,即每一個EventLoop只會建立一個線程,後續全部的任務,都是在run()方法裏無限輪詢去執行。
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory");
}
/* 執行任務時,利用ThreadFactory建立一個新線程去跑。 EventLoop會在第一次execute()時調用該方法,整個生命週期只會調用一次, 即每一個EventLoop只會建立一個線程,後續全部的任務,都是在run()方法裏無限輪詢去執行。 */
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}
複製代碼
有了Executor,接下來就會調用newChild()
進行children的初始化,對於NioEventLoopGroup來講,它管理的孩子是NioEventLoop,因此newChild()
會建立NioEventLoop:
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
// EventLoop須要一個TaskQueue來存放待執行的任務,這裏判斷是否有指定QueueFactory,沒有則使用默認的
EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
// 建立NioEventLoop
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}
複製代碼
EventLoopGroup自己不幹活,向它提交任務,它只會交給它的孩子EventLoop執行,因此它依賴一個EventExecutorChooser,當有任務來臨時,從衆多的孩子中挑選出一個,默認的選擇策略就是簡單輪詢。 Netty這裏作了一個小小的優化,若是孩子數量是2的冪次方數會使用位運算,不然取模。源碼以下:
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {
public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();
// 單例模式,經過INSTANCE提供一個單例對象
private DefaultEventExecutorChooserFactory() { }
/* 建立一個選擇器,從一組EventExecutor中挑選出一個。 Netty默認的選擇策略就是:簡單輪詢。 */
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
// 兩種Chooser實現都有一個AtomicLong計數器,每次next()先自增再取餘
// 若是數量是2的冪次方數,則採用位運算
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
// 不然,對長度進行取餘
return new GenericEventExecutorChooser(executors);
}
}
// 是不是2的冪次方數
private static boolean isPowerOfTwo(int val) {
return (val & -val) == val;
/* 二進制中,最高位是符號位,0正數、1負數。剩下的就是這個數的絕對值部分。 原碼:設置符號位,其餘0填充 反碼:正數的反碼與原碼相同,負數的反碼:除符號位外,其餘位取反 補碼:正數的補碼與原碼相同,負數的補碼:除符號位外,其餘位取反,而後在最後一位加1(計算機使用補碼) 以下舉例: 5:00000000 00000000 00000000 00000101(原碼) 5:00000000 00000000 00000000 00000101(反碼) 5:00000000 00000000 00000000 00000101(補碼) -5:10000000 00000000 00000000 00000101(原碼) -5:11111111 11111111 11111111 11111010(反碼) -5:11111111 11111111 11111111 11111011(補碼) 5 & -5 = 00000000 00000000 00000000 00000001 = 1 不是2的冪次方數 8 & -8 = 00000000 00000000 00000000 00001000 & 11111111 11111111 11111111 11111000 = 00000000 00000000 00000000 00001000 = 8 是2的冪次方數 */
}
// 2的冪次方數的選擇器,位運算
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
// 計數器自增 & 長度-1,和HashMap同樣
return executors[idx.getAndIncrement() & executors.length - 1];
}
}
// 普通的選擇器,取餘
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
private final AtomicLong idx = new AtomicLong();
private final EventExecutor[] executors;
GenericEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
}
}
}
複製代碼
有了選擇器後,你向EventLoopGroup提交的任務,包括註冊Channel,它都會輪詢出一個EventLoop轉交任務,源碼以下:
@Override
public ChannelFuture register(Channel channel) {
// 選出一個孩子,讓它去執行
return next().register(channel);
}
複製代碼
EventLoopGroup還有一個方法特別有用,那就是shutdownGracefully()
優雅停機,調用後它會中止接受新的任務,並把隊列中等待執行的任務(包括定時任務)處理完(Netty不保證100%處理完),而後釋放資源。因爲EventLoopGroup自己不幹活,所以它依然中止全部的EventLoop,,源碼以下:
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
for (EventExecutor l: children) {
// 將孩子按個停機
l.shutdownGracefully(quietPeriod, timeout, unit);
}
// 返回一個終止Future,停機後收到通知
return terminationFuture();
}
複製代碼
NioEventLoopGroup差很少就這樣,比較簡單,它只是負責管理EventLoop,核心仍是在EventLoop上。
NioEventLoopGroup在建立時,會根據線程數初始化NioEventLoop。NioEventLoop能夠看做是一個單線程的線程池,也是真正幹活的角色,它的繼承關係以下:
NioEventLoop的主要職責是負責處理註冊到其上的Channel的IO事件,除此以外它還能夠執行用戶提交的系統任務和定時任務,例如:你能夠每隔一段時間檢查一下鏈接是否斷開,若是斷開,客戶端能夠重連,服務端須要及時釋放資源。
一個Channel只能被註冊到一個EventLoop上,一個EventLoop能夠註冊多個Channel。一旦Channel註冊到EventLoop,該EventLoop就要負責處理它整個生命週期的全部事件。事件以回調的方式被觸發,全部的回調任務會被封裝成一個Runnable
放入taskQueue,由EventLoop線程串行化處理。雖然看似「串行化處理」效率低下,可是這避免了線程切換的開銷和數據同步的問題,並且你能夠開啓多個EventLoop,並行處理,充分利用CPU資源。
先看屬性,以下:
private static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization.
// 是否禁用SelectionKey優化?默認爲false
private static final boolean DISABLE_KEY_SET_OPTIMIZATION =
SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3;
// Selector重建的閾值,默認512,目的是解決JDK Selector空輪詢Bug
private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;
// 當前有幾個準備就緒的Channel?selectStrategy會用到,大於0表明有Channel事件須要處理
private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get() throws Exception {
return selectNow();
}
};
複製代碼
再看構造函數,源碼以下:
/** * 建立一個NioEventLoop實例,用來執行註冊在其上的Channel事件 * @param parent 所屬Group * @param executor * @param selectorProvider 多路複用器提供者,不一樣平臺會使用不一樣實現 * @param strategy Selector.select()的策略 * @param rejectedExecutionHandler 拒絕策略 * @param queueFactory 任務隊列工廠 */
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) {
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler);
this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
/* 每一個EventLoop都會有一個Selector,用來監聽註冊在其上的Channel事件。 對於BossGroup,處理的是Accept。 對於WorkerGroup,處理的是read、write... SelectorTuple:Selector元組,Netty提供了一個Selector包裝,用來優化select()性能 */
final SelectorTuple selectorTuple = openSelector();
this.selector = selectorTuple.selector;
this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
複製代碼
run()
方法。NioEventLoop首先建立了兩個TaskQueue來存放待執行的任務,run()
方法會不斷消費任務。雖然能夠多線程併發的往taskQueue中提交任務,可是因爲EventLoop是單線程的,全部taskQueue的生產消費模型是:**多生產者單消費者。**針對這種消費場景,Netty實現了高性能的無鎖隊列「MpscQueue」,Queue的建立源碼以下:
// 建立TaskQueue,存放待執行的任務
private static Queue<Runnable> newTaskQueue( EventLoopTaskQueueFactory queueFactory) {
if (queueFactory == null) {
// 默認建立Netty實現MpscQueue:Netty實現的高性能無鎖隊列,適用於多個生產者,單個消費者。
return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
}
return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
}
/* 根據最大隊列數建立Queue。 MpscQueue:Netty實現的高性能無鎖隊列,適用於多個生產者,單個消費者。 多個線程能夠併發往EventLoop提交任務,可是EventLoop自己是單線程消費的。 */
private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
// This event loop never calls takeTask()
return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
: PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
}
複製代碼
關於MpscQueue,後面會專門寫文章介紹。
建立完taskQueue就是調用父類構造器進行相應的賦值操做了,這裏略過,下面主要看openSelector()
。 每一個NioEventLoop被建立時,都會同時建立一個Selector
多路複用器,這是JDK提供的,不熟悉的同窗去看看Java Nio編程。EventLoopGroup會將Channel註冊到NioEventLoop上,實際上就是註冊到Selector
上了。這樣NioEventLoop就能夠經過Selector
來監聽準備就緒的Channel,而後根據事件類型去觸發相應的回調,因此Selector
是NioEventLoop的核心。
openSelector()
會作一個優化,將JDK的SelectorImpl
的selectedKeys、publicSelectedKeys屬性由HashSet替換成Netty的SelectedSelectionKeySet
,內部是一個數組。當Selector監聽到有準備就緒的Channel時,會往HashSet裏添加SelectionKey,當SelectionKey比較多時,就容易發生哈希衝突,時間複雜度會增長,而SelectedSelectionKeySet
內部使用數組來保存,避免了哈希衝突,性能會有必定的提高。
/* 打開一個Selector多路複用器 */
private SelectorTuple openSelector() {
final Selector unwrappedSelector;//未包裝的原生Selector
try {
// 基於SelectorProvider打開一個原生的Selector,這是JDK提供的。
unwrappedSelector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
// 若是禁用了SelectionKey優化,則unwrappedSelector和selector都指向原生Selector
if (DISABLE_KEY_SET_OPTIMIZATION) {
return new SelectorTuple(unwrappedSelector);
}
// 不然,使用SelectedSelectionKeySet來優化SelectionKey
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
return Class.forName(
"sun.nio.ch.SelectorImpl",
false,
PlatformDependent.getSystemClassLoader());
} catch (Throwable cause) {
return cause;
}
}
});
if (!(maybeSelectorImplClass instanceof Class) ||
// ensure the current selector implementation is what we can instrument.
!((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
if (maybeSelectorImplClass instanceof Throwable) {
Throwable t = (Throwable) maybeSelectorImplClass;
logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
}
return new SelectorTuple(unwrappedSelector);
}
final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
// 反射獲取SelectorImpl的selectedKeys、publicSelectedKeys屬性
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
// Let us try to use sun.misc.Unsafe to replace the SelectionKeySet.
// This allows us to also do this in Java9+ without any extra flags.
long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
long publicSelectedKeysFieldOffset =
PlatformDependent.objectFieldOffset(publicSelectedKeysField);
if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
PlatformDependent.putObject(
unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
PlatformDependent.putObject(
unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
return null;
}
// We could not retrieve the offset, lets try reflection as last-resort.
}
Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
if (cause != null) {
return cause;
}
cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
if (cause != null) {
return cause;
}
/* 經過反射將SelectorImpl的selectedKeys、publicSelectedKeys替換爲selectedKeySet來提高性能。 */
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
return null;
} catch (NoSuchFieldException e) {
return e;
} catch (IllegalAccessException e) {
return e;
}
}
});
if (maybeException instanceof Exception) {
selectedKeys = null;
Exception e = (Exception) maybeException;
logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
return new SelectorTuple(unwrappedSelector);
}
selectedKeys = selectedKeySet;
logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
// 建立一個SelectorTuple,包含一個原生的Selector,和優化過的Selector。
return new SelectorTuple(unwrappedSelector,
new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
}
複製代碼
SelectedSelectionKeySet部分源碼以下:
/** * Selector的 Set<SelectionKey> selectedKeys * 默認用HashSet存儲,當有Channel準備就緒時,會添加到HashSet中,但若是發生衝突,HashSet的時間複雜度是O(n)鏈表/O(log n)紅黑樹 * Netty經過反射將selectedKeys、publicSelectedKeys替換成SelectedSelectionKeySet * 使用數組來避免哈希衝突 */
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
// 使用數組來保存,默認長度1024
SelectionKey[] keys;
int size;//keys大小
SelectedSelectionKeySet() {
keys = new SelectionKey[1024];
}
// 省略部分代碼.......
}
複製代碼
建立完Selector
,NioEventLoop的初始化就完成了,但此時線程並未啓動,Netty這裏作了懶加載處理,只有當EventLoop第一次被調用execute()
執行任務時纔會經過executor
去建立線程跑run()
方法。
用戶不主動提交任務的前提下,對於BossGroup的EventLoop來講,線程是在調用bind()
方法將ServerSocketChannel註冊到EventLoop時被啓動的。對於WorkerGroup的EventLoop來講,線程是在BossGroup接收到客戶端鏈接時,將SocketChannel註冊到WorkerGroup時被啓動的。
不論是ServerSocketChannel.bind()
仍是接收到客戶端鏈接,都是要將Channel註冊到EventLoop,再由EventLoop去輪詢處理事件。register()
源碼以下:
// 註冊Channel
@Override
public ChannelFuture register(Channel channel) {
// 建立一個DefaultChannelPromise,再註冊,目的是讓用戶能夠在註冊完成時收到通知
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
// 轉交給Channel.Unsafe完成
promise.channel().unsafe().register(this, promise);
return promise;
}
複製代碼
這裏須要說下Channel.Unsafe
接口,對於bind()
、write()
、read()
等這類方法,因爲須要和底層API交互,Netty對開發者屏蔽了底層實現,不但願由開發者調用這類方法,因而將它們封裝到Channel.Unsafe
中,從名字中也能看出來,這些操做是不安全的,開發者儘可能不要去本身調用。
register()
操做的目的其實就是將JDK的SocketChannel註冊到Selector
多路複用器上,因爲須要和底層API交互,因而轉交給Channel.Unsafe處理,源碼在io.netty.channel.AbstractChannel.AbstractUnsafe#register()
,以下所示:
/* 將Channel註冊到EventLoop,其實就是調用JDK底層的:SocketChannel.register(selector)。 將Channel註冊到多路複用器。 */
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
ObjectUtil.checkNotNull(eventLoop, "eventLoop");
// 重複註冊校驗
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
// 檢查是否兼容,Channel和EventLoop模式不能混用,例如Oio和Nio不兼容
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
/* 當前線程是不是EventLoop線程? 若是是就直接執行,不然提交一個任務,後面串行化執行。 */
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
複製代碼
register()
作了一些檢查,而後確保由EventLoop來執行註冊操做,前面說過了,EventLoop會負責處理Channel的全部事件。register0()
完成註冊,並觸發相應的事件回調,經過Pipeline傳播出去。
private void register0(ChannelPromise promise) {
try {
// 確保Channel是打開狀態
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
// JDK原生channel.register(selector)
doRegister();
neverRegistered = false;
registered = true;
// 觸發 ChannelHandler.handlerAdded()回調
pipeline.invokeHandlerAddedIfNeeded();
// 通知promise操做成功了,觸發回調
safeSetSuccess(promise);
// 註冊完成,觸發ChannelRegistered回調,經過pipeline傳播出去
pipeline.fireChannelRegistered();
// 若是鏈接激活了,則觸發active事件,只在首次註冊時會觸發
if (isActive()) {
if (firstRegistration) {
// 觸發ChannelRegistered回調,經過pipeline傳播出去
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
// 異常了,關閉資源,觸發失敗通知
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
複製代碼
doRegister()
會調用JDK底層的註冊,源碼以下:
// 真正調用JDK底層API完成註冊
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// 獲取Java原生SocketChannel註冊到未包裝的原生Selector上
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
eventLoop().selectNow();
selected = true;
} else {
throw e;
}
}
}
}
複製代碼
完成SocketChannel的註冊後,EventLoop就能夠經過輪詢Selector
來監聽準備就緒的Channel了,後面就是一系列的事件處理了。
在調用io.netty.channel.AbstractChannel.AbstractUnsafe#register()
時,EventLoop線程已經啓動並執行run()
方法,在run()
方法裏,EventLoop線程會執行一個死循環,直到線程被中止。
在死循環裏,EventLoop線程會不斷輪詢Selector
是否有準備就緒的Channel須要處理?taskQueue是否有任務在等待執行?scheduledTaskQueue是否有定時任務須要執行?NioEventLoop.run()
是任務處理的關鍵。
@Override
protected void run() {
/* 無效空輪詢的次數 JDK的Selector存在Bug,會致使空輪詢,CPU飆升。 Netty會檢測Selector.select()空輪詢次數,超過SELECTOR_AUTO_REBUILD_THRESHOLD則重建Selector。 有效輪詢:要麼有IO事件到達、要麼執行了Task。 */
int selectCnt = 0;
for (;;) {
try {
int strategy;
try {
/* NioEventLoop的執行策略: 有任務待執行嗎? 沒有:Selector.select()阻塞,等待IO事件到達(定時任務判斷) 有:非阻塞調用Selector.selectNow(), */
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:// 重試IO循環
continue;
case SelectStrategy.BUSY_WAIT:// NIO不支持忙等,走SELECT
case SelectStrategy.SELECT: // 隊列中沒有任務要執行
// 下一個要執行的定時任務截止時間
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE;//沒有定時任務
}
nextWakeupNanos.set(curDeadlineNanos);
try {
/* 若是沒有任務要執行,則在下一個任務要執行前,阻塞等待IO事件。 沒有定時任務,則等待超時爲Long.MAX_VALUE,無限等待 */
if (!hasTasks()) {
strategy = select(curDeadlineNanos);
}
} finally {
// This update is just to help block unnecessary selector wakeups
// so use of lazySet is ok (no race condition)
nextWakeupNanos.lazySet(AWAKE);
}
// fall through
default:
}
} catch (IOException e) {
// If we receive an IOException here its because the Selector is messed up. Let's rebuild
// the selector and retry. https://github.com/netty/netty/issues/8566
rebuildSelector0();
selectCnt = 0;
handleLoopException(e);
continue;
}
selectCnt++;//無效輪詢次數+1,後面會判斷是否重置
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
boolean ranTasks;
if (ioRatio == 100) {
// 優先處理全部的IO事件後再去處理Task
try {
if (strategy > 0) {// 表明有準備就緒的Channel待處理
processSelectedKeys();
}
} finally {
// 處理完IO事件後,執行全部Task
ranTasks = runAllTasks();
}
} else if (strategy > 0) {
// 先處理IO事件,並記錄所花的時間
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// 根據ioTime和ioRatio,計算處理Task能分配的時間
final long ioTime = System.nanoTime() - ioStartTime;
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
/* 有待執行的任務,且Selector.selectNow()返回0,沒有IO事件須要處理,那就先執行少許的Task。 每64個任務檢查一次超時,若是有足夠的任務,那麼最少執行64個。 因此,不該該提交耗時任務,阻塞IO線程!!! */
ranTasks = runAllTasks(0); // This will run the minimum number of tasks
}
if (ranTasks || strategy > 0) {
// 若是執行了任務或者有IO事件,說明此次輪詢是有效的,重置selectCnt
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
selectCnt = 0;
} else if (unexpectedSelectorWakeup(selectCnt)) { // 意外喚醒時,是否須要重置selectCnt,解決Selector空輪詢Bug
selectCnt = 0;
}
} catch (CancelledKeyException e) {
// Harmless exception - log anyway
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
} catch (Error e) {
throw (Error) e;
} catch (Throwable t) {
handleLoopException(t);
} finally {
// 無論正常/異常中止,都要關閉,釋放資源。
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Error e) {
throw (Error) e;
} catch (Throwable t) {
handleLoopException(t);
}
}
}
}
複製代碼
SelectStrategy
是一個選擇策略,其實就是告訴EventLoop線程須要作什麼事。
Selector.select()
上等待就緒的Channel。NioEventLoop在沒有Channel事件,又沒有taskQueue任務時,會調用nextScheduledTaskDeadlineNanos()
計算距離下一次要執行的定時任務還有多長時間,在這以前,它會調用Selector.select(curDeadlineNanos)
阻塞等待Channel事件(5微妙內不會阻塞),源碼以下:
// 在下一個定時任務要執行前,等待IO事件
private int select(long deadlineNanos) throws IOException {
if (deadlineNanos == NONE) {
// 沒有定時任務,直接阻塞
return selector.select();
}
// 若是截止時間在5微秒內,超時將爲0
long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
}
複製代碼
被喚醒,要麼是由於有Channel事件了,要麼是超時了須要執行定時任務了,開始走下面的邏輯。
ioRatio
表明EventLoop執行IO事件和Task的時間比例,100表明優先執行完全部的IO事件再執行系統任務,不然會根據這個比例去調整執行Task所消耗的時間。
processSelectedKeys()
會挨個處理準備就緒的Channel事件,前面說過,Netty默認會使用數組代替HashSet優化SelectionKey,這裏會進行判斷:
/* 處理SelectionKey,分爲優化後的處理,和普通處理 優化:HashSet<SelectionKey> --> SelectionKey[] */
private void processSelectedKeys() {
if (selectedKeys != null) {
// 說明Netty將HashSet優化爲數組了,能夠高效處理
processSelectedKeysOptimized();
} else {
// 沒優化過,普通處理
processSelectedKeysPlain(selector.selectedKeys());
}
}
複製代碼
不論如何,最終都會遍歷selectedKeys,挨個處理,源碼以下:
// 處理SelectionKey事件
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {//有效性檢查,Channel、Selector可能已經被關閉
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
return;
}
if (eventLoop == this) {
unsafe.close(unsafe.voidPromise());
}
return;
}
try {
// 準備就緒的事件標誌位
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
// 鏈接就緒
unsafe.finishConnect();
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// 數據可寫
ch.unsafe().forceFlush();
}
// 數據可讀、有新的鏈接接入
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
// 對於ServerSocketChannel只關心OP_ACCEPT事件
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
複製代碼
針對不一樣的就緒事件,會調用Channel.Unsafe對應的方法。
對於OP_CONNECT
事件,會調用unsafe.finishConnect()
方法,它主要就是判斷鏈接是否激活,若是激活則觸發ChannelActive
回調,並經過Pipeline傳播出去。
對於OP_WRITE
事件,會調用ch.unsafe().forceFlush()
方法,這裏的ch
是指客戶端Channel,它會將ChannelOutboundBuffer
緩衝的數據轉換成JDK的ByteBuffer
並調用底層API經過SocketChannel響應給客戶端。
對於OP_ACCEPT
事件,ServerSocketChannel會調用io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read()
方法來接收客戶端鏈接:
/* NioEventLoop.processSelectedKey() 當Channel有 OP_READ | OP_ACCEPT 事件時調用該方法。 對於服務端Channel來講,就是 OP_ACCEPT. */
@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
// 接收對端數據時,ByteBuf的分配策略,基於歷史數據動態調整初始化大小,避免太大浪費空間,過小又會頻繁擴容
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
/* 對於ServerSocketChannel來講,就是接收一個客戶端Channel,添加到readBuf */
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
// 遞增已讀取的消息數量
allocHandle.incMessagesRead(localRead);
} while (continueReading(allocHandle));
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
// 經過pipeline傳播ChannelRead事件
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
// 讀取完畢的回調,有的Handle會根據本次讀取的總字節數,自適應調整下次應該分配的緩衝區大小
allocHandle.readComplete();
// 經過pipeline傳播ChannelReadComplete事件
pipeline.fireChannelReadComplete();
if (exception != null) {// 事件處理異常了
// 是否須要關閉鏈接
closed = closeOnReadError(exception);
// 經過pipeline傳播異常事件
pipeline.fireExceptionCaught(exception);
}
if (closed) {//若是須要關閉,那就關閉
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
複製代碼
主要是看doReadMessages()
方法,Netty會調用accept()
獲取到一個JDK原生SocketChannel,並把它包裝成Netty的NioSocketChannel
:
/* 對於服務端Channel來講,處理 OP_ACCEPT 事件就是從Channel中接收一個客戶端Channel。 */
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
// 獲取客戶端Channel,調用的就是JDK原生方法:serverSocketChannel.accept()
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
// 將原生SocketChannel包裝成Netty的NioSocketChannel
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
複製代碼
接收到客戶端的鏈接,並把它封裝成NioSocketChannel
,隨後會觸發channelRead
回調,在ServerBootstrapAcceptor.ServerBootstrapAcceptor
中,會把客戶端Channel註冊到WorkerGroup中,由WorkerGroup去完成後續的IO讀寫事件,BossGroup只負責鏈接的創建,這就是經典的Reactor線程模型。
一樣對於OP_ACCEPT
事件,SocketChannel會調用io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read()
來接收對端發送的數據:
/* 客戶端發送數據時觸發。 見 io.netty.channel.nio.NioEventLoop.processSelectedKey */
@Override
public final void read() {
final ChannelConfig config = config();
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
// 分配一個ByteBuf,大小能容納可讀數據,又不過於浪費空間。
byteBuf = allocHandle.allocate(allocator);
/* doReadBytes(byteBuf):ByteBuf內部有ByteBuffer,底層仍是調用了SocketChannel.read(ByteBuffer) allocHandle.lastBytesRead()根據讀取到的實際字節數,自適應調整下次分配的緩衝區大小。 */
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
// 沒數據可讀了.
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
// There is nothing left to read as we received an EOF.
readPending = false;
}
break;
}
// 遞增已經讀取的消息數量
allocHandle.incMessagesRead(1);
readPending = false;
// 經過pipeline傳播ChannelRead事件
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());//判斷是否須要繼續讀
// 讀取完畢,pipeline傳播ChannelReadComplete事件
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
複製代碼
大致邏輯和ServerSocketChannel相似,只是接收到的數據再也不是SocketChannel,而是ByteBuf。底層仍是調用了JDK原生的SocketChannel.read(ByteBuffer)
,再將ByteBuffer轉換成Netty的ByteBuf。
數據接收到後,繼續經過Pipeline傳播ChannelRead
和ChannelReadComplete
回調。
到這裏,基本就把EventLoop說的差很少了,總體工做流程已經瞭解了。細節的地方如:ByteBuf是如何動態分配的,ByteBuf是如何寫出到SocketChannel的等等,這些後面專門寫文章講吧,否則這篇文章太長了。
前面分別講了NioEventLoopGroup和NioEventLoop單獨的工做流程,尚未把整個完整的流程給串起來。做爲服務端啓動的引導類,ServerBootstrap是服務端整個啓動流程的入口,核心方法 bind()
會調用initAndRegister()
建立一個ServerSocketChannel,並把它註冊到BossGroup的EventLoop的 Selector
上,這樣BossGroup就能夠處理鏈接事件了。但此時是不會有鏈接事件的,由於尚未綁定到本地端口,客戶端沒法創建鏈接。 註冊完後,ServerBootstrap隨後會調用doBind0()
將ServerSocketChannel綁定到本地端口,至此服務端啓動完成,耐心等待Channel事件便可。
/* 建立一個ServerSocketChannel,並綁定到本地端口 */
public ChannelFuture bind(SocketAddress localAddress) {
// 數據驗證,group/channelFactory不能爲null
validate();
return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
}
private ChannelFuture doBind(final SocketAddress localAddress) {
/* 1.反射建立ServerSocketChannel 2.ServerSocketChannel的初始化,建立Pipeline、設置Options、Attrs。 3.將ServerSocketChannel註冊到EventLoop 此時,EventLoop能夠開始輪詢Accept事件了,可是因爲還未bind本地端口,因此不會有事件發生。 */
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
// 若是異常了,直接返回
return regFuture;
}
if (regFuture.isDone()) {
// Register成功了,則開始綁定端口
ChannelPromise promise = channel.newPromise();
/* 將Channel綁定到本地端口,底層仍是調用了JDK原生的channel.bind()。 因爲bind()是一個出站事件,須要經過Pipeline傳播,因此會轉交給Pipeline執行:pipeline.bind(localAddress, promise)。 最終會傳播到DefaultChannelPipeline的HeadContext.bind(),它又會轉交給Channel.Unsafe.bind()。 Channel.Unsafe.bind()最終會調用JDK原生的javaChannel().bind(),詳見:io.netty.channel.socket.nio.NioServerSocketChannel.doBind() 綁定成功後,會觸發promise的回調 */
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// 由於是異步的,防止Register還沒完成,經過註冊回調來綁定。
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
複製代碼
先看initAndRegister()
,初始化ServerSocketChannel並註冊到BossGroup:
// 初始化和註冊
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
/* channelFactory根據Channel.class反射建立實例 服務端:ServerSocketChannel 客戶端:SocketChannel */
channel = channelFactory.newChannel();
/* 初始化Channel:服務端和客戶端 1.設置ChannelPipeline 2.設置options 3.設置attrs */
init(channel);
} catch (Throwable t) {
if (channel != null) {
channel.unsafe().closeForcibly();
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
// 將Channel註冊到EventLoop,從Group中輪詢出一個EventLoop
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
複製代碼
init()
會初始化Channel,分爲服務端和客戶端兩種。客戶端Channel初始化很簡單,就是設置Pipeline、Options、Attrs,這裏就不貼代碼了。服務端複雜一些,除了設置自身的Pipeline、Options、Attrs,還要負責初始化客戶端接入的Channel,並把它註冊到WorkerGroup:
// 服務端Channel初始化
@Override
void init(Channel channel) {// 這裏的channel是ServerSocketChannel
// 設置options
setChannelOptions(channel, newOptionsArray(), logger);
// 設置attrs
setAttributes(channel, newAttributesArray());
// 初始化ServerSocketChannel的ChannelPipeline
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
// 和ServerSocketChannel創建鏈接的客戶端SocketChannel須要設置的options和attrs
final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs);
/* 往服務端Channel添加Handler: 1.封裝HandlerAdded回調任務,保存在PendingHandlerCallback 2.後續的register()操做會觸發回調:pipeline.invokeHandlerAddedIfNeeded(); */
p.addLast(new ChannelInitializer<Channel>() {
/* initChannel()什麼時候被調用? ChannelHandler被添加到Pipeline有一個對應的回調:handlerAdded() addLast()會提交一個任務,讓EventLoop來觸發這個回調 ChannelInitializer在handlerAdded()回調裏會執行該初始化方法。 */
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();//ServerBootstrap.handler()設置的
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// ServerBootstrapAcceptor是服務端接收客戶端鏈接的核心
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
複製代碼
服務端Channel在初始化Pipeline的時候會添加一個ServerBootstrapAcceptor
,它是服務端接收客戶端鏈接的核心。
先看屬性,它保留了客戶端鏈接時建立Channel的必要信息:
private final EventLoopGroup childGroup;// Reactor模型中的WorkerGroup
private final ChannelHandler childHandler;// 客戶端Channel的ChannelHandler
private final Entry<ChannelOption<?>, Object>[] childOptions;// 客戶端Channel的Options
private final Entry<AttributeKey<?>, Object>[] childAttrs;// 客戶端Channel的Attrs
private final Runnable enableAutoReadTask; // 啓用自動讀取的任務
複製代碼
構造函數就不貼代碼了,都是屬性賦值操做。
須要重點關注的方法是channelRead()
,前面已經分析過了,BossGroup監聽到有客戶端接入時會觸發該回調:
/* 有客戶端鏈接時,觸發. 見 io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe.read() */
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;// 這裏的Channel是SocketChannel
// 設置客戶端Channel的Pipeline、Options、Attrs
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
setAttributes(child, childAttrs);
try {
/* 將客戶端Channel註冊到WorkerGroup: 1.next()輪詢出一個EventLoop.register() 2.Channel.Unsafe.register(),Channel註冊到Selector 3.觸發各類回調 Channel一旦註冊到EventLoop,就由該EventLoop負責處理它整個生命週期的全部事件。 */
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// 若是註冊失敗,強制關閉鏈接
if (!future.isSuccess()) {
// 底層就是調用原生JDK的關閉方法:javaChannel().close();
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
複製代碼
這裏處理的是客戶端的接入,設置Options、Attrs、Pipeline,並註冊到WorkerGroup,後續的全部讀寫事件交給WorkerGroup處理。
在doBind0()
沒調用以前,全部的這一切都不會發生,因此最後只要看一下Netty是如何將ServerSocketChannel綁定到本地端口的,整個流程就所有分析結束了。
private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) {
// 往Channel綁定的EventLoop提交一個綁定任務,轉交給Channel去執行
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
複製代碼
因爲bind()
是一個出站事件,因此會轉交給Pipeline執行,須要它把事件傳播出去。
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
複製代碼
Pipeline會從TailContext開始傳播,TailContext會日後尋找能處理bind
事件的ChannelHandler:
@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
ObjectUtil.checkNotNull(localAddress, "localAddress");
if (isNotValidPromise(promise, false)) {
// cancelled
return promise;
}
/* TailContext會日後尋找能處理bind事件的ChannelHandler。 由於是出站事件,因此調用findContextOutbound() */
final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {// 讓EventLoop線程串行化處理
next.invokeBind(localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeBind(localAddress, promise);
}
}, promise, null, false);
}
return promise;
}
複製代碼
若是用戶沒有重寫bind()
回調的話,TailContext會把事件傳播給HeadContext,因爲bind
操做須要和底層API交互,HeadContext會將操做轉交給Channel.Unsafe
執行,因此最終會調用io.netty.channel.AbstractChannel.AbstractUnsafe#bind()
,源碼以下:
/* 將ServerSocketChannel綁定到本地端口,如何被觸發的? 1.Bootstrap.bind()會往Channel註冊的EventLoop提交一個任務:Channel.bind() 2.因爲bind()是一個出站事件,須要被Pipeline傳播出去,因而會被轉交給Pipeline執行:Pipeline.bind() 3.bind()事件從TailContext開始傳播,不出意外會傳播到HeadContext。 4.HeadContext會再將bind()任務轉交給Channel.Unsafe執行,因而被觸發。 總結:Channel.bind()會將事件經過Pipeline進行傳播,從TailContext到HeadContext。 */
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();//確保是EventLoop線程執行
// promise標記爲不可取消 確保Channel是Open狀態,若是close了就沒法bind了
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
!PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}
boolean wasActive = isActive();//鏈接是否活躍
try {
/* 真正的綁定操做,子類實現。 看NioServerSocketChannel實現,就是調用了JDK原生的javaChannel().bind(); */
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (!wasActive && isActive()) {
// 鏈接處於活躍狀態,觸發Active回調,往EventLoop提交一個任務,經過Pipeline傳播出去。
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
複製代碼
doBind()
又會調回到NioServerSocketChannel.doBind()
,其實就是調用JDK原生的ServerSocketChannel.bind(localAddress , backlog)
,源碼以下:
@SuppressJava6Requirement(reason = "Usage guarded by java version check")
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
// 獲取JDK的ServerSocketChannel.bind()
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
複製代碼
綁定成功後就能夠正常處理客戶端的接入了,以後客戶端Channel都會由WorkerGroup驅動IO的讀寫。
這篇文章分析了Netty服務端啓動的全流程,從ServerSocketChannel的建立到綁定端口,再到BossGroup驅動客戶端鏈接的接入和WorkerGroup驅動數據的讀寫。 還重點分析了NioEventLoopGroup和NioEventLoop的工做模式,認真讀完,相信你會對Netty總體的工做機制有所瞭解。
數據接收ByteBuf的分配,數據write
的底層細節沒有介紹到,包括Netty對高性能所做的努力也尚未過多介紹,考慮到篇幅緣由,後面會專門再開一篇文章。
寫到這裏就結束了,此時此刻,個人電腦編輯器已經很是卡了,艱難的敲下這段文字後,是時候說再見了!!!