public class Server {
private ServerBootstrap serverBootstrap;
private NioEventLoopGroup bossGroup;
private NioEventLoopGroup workGroup;
public static void main(String[] args) throws InterruptedException {
System.out.println("服務啓動");
Server server = new Server();
server.start();
}
private void start() throws InterruptedException {
try {
serverBootstrap=new ServerBootstrap();
bossGroup = new NioEventLoopGroup();
workGroup = new NioEventLoopGroup(4);
serverBootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.handler(new InitHandler())
.childHandler(new IOChannelInitialize());
ChannelFuture future = serverBootstrap.bind(8802).sync();
future.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
private class IOChannelInitialize extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel ch) throws Exception {
System.out.println("initChannel");
ch.pipeline().addLast(new IdleStateHandler(1000, 0, 0));
ch.pipeline().addLast(new IOHandler());
}
}
}
複製代碼
步驟說明java
1.1 建立 ServerBootstrap 實例,它是 netty 的啓動輔助類,提供了一系列的方法用於設置服務 端啓動相關的參數。底層經過門面模式對各類能力進行抽象和封裝,儘可能不須要用戶跟過 多的底層 API 打交道,下降用戶的開發難度react
1.2 NioEventLoopGroup 是 netty Reactor 線程池,bossGroup 監聽和 accept 客戶端鏈接,workGroup 則處理 IO ,編解碼promise
1.3 綁定服務端 NioServerSocketChannel安全
1.4 設置一些參數bash
1.5 初始化 pipeline 並綁定 handler ,pipeline 是一個負責處理網絡事件的職責鏈,負責管理和執行 ChannelHandler ,設置系統提供的 IdleStateHandler 和自定義 IOHandler網絡
1.6 serverBootstrap.bind(8802) 這裏纔是啓動服務端綁定端口異步
1.7 future.channel().closeFuture().sync(); 等待服務端關閉socket
1.8 優雅關閉ide
NioEventLoopGroup 不單單是 I/O 線程,除了負責 I/O 的讀寫,還負責系統 Task 和定時任務oop
public NioEventLoopGroup(int nThreads) {
this(nThreads, null);
}
複製代碼
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
this(nThreads, threadFactory, SelectorProvider.provider());
}
複製代碼
public NioEventLoopGroup(
int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
super(nThreads, threadFactory, selectorProvider);
}
複製代碼
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
super(nThreads == 0? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
}
複製代碼
繼續,如下是精簡代碼
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
...
if (threadFactory == null) {
threadFactory = newDefaultThreadFactory();
}
children = new SingleThreadEventExecutor[nThreads];
if (isPowerOfTwo(children.length)) {
chooser = new PowerOfTwoEventExecutorChooser();
} else {
chooser = new GenericEventExecutorChooser();
}
for (int i = 0; i < nThreads; i ++) {
...
children[i] = newChild(threadFactory, args);
...
}
複製代碼
MultithreadEventExecutorGroup 實現了線程的建立和線程的選擇,咱們看看 newChild 方法( NioEventLoopGroup 類的方法),newChild 實例化線程
@Override
protected EventExecutor newChild(
ThreadFactory threadFactory, Object... args) throws Exception {
return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]);
}
複製代碼
建立了一個 NioEventLoop
NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {
super(parent, threadFactory, false);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
provider = selectorProvider;
selector = openSelector();
}
複製代碼
跟着 super
protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
super(parent, threadFactory, addTaskWakesUp);
}
複製代碼
代碼有精簡,繼續
protected SingleThreadEventExecutor(
EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
thread = threadFactory.newThread(new Runnable() {
@Override
public void run() {
SingleThreadEventExecutor.this.run();
}
}
});
複製代碼
在這裏實例化了一個線程,並在 run 中調用 SingleThreadEventExecutor 的 run 方法,這個線程在哪裏啓動的呢,咱們繼續往下看
總結:
NioEventLoopGroup 實際就是 Reactor 線程池,負責調度和執行客戶端的接入、網絡讀寫事件的處理、用戶自定義任務和定時任務的執行。
ServerBootstrap 是服務端的啓動輔助類,父類是 AbstractBootstrap ,與之相對應的客戶端啓動輔助類是 Bootstrap
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
volatile EventLoopGroup group;
private volatile ChannelFactory<? extends C> channelFactory;
private volatile SocketAddress localAddress;
private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
private final Map<AttributeKey<?>, Object> attrs = new LinkedHashMap<AttributeKey<?>, Object>();
private volatile ChannelHandler handler;
}
複製代碼
將 bossGroup 傳給父類,workGroup 賦值給 serverBootstrap 的 childGroup
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (childGroup == null) {
throw new NullPointerException("childGroup");
}
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = childGroup;
return this;
}
複製代碼
serverBootstrap.channel(NioServerSocketChannel.class)
複製代碼
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new BootstrapChannelFactory<C>(channelClass));
}
複製代碼
繼續跟 new BootstrapChannelFactory
private static final class BootstrapChannelFactory<T extends Channel> implements ChannelFactory<T> {
private final Class<? extends T> clazz;
BootstrapChannelFactory(Class<? extends T> clazz) {
this.clazz = clazz;
}
@Override
public T newChannel() {
try {
return clazz.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}
}
複製代碼
BootstrapChannelFactory 是一個繼承了 ChannelFactory 的內部類,從名稱上就能看出,這是一個 channel 工廠類,重寫了父類的 newChannel 方法,經過反射建立 NioServerSocketChannel 實例,後面會告訴你是在哪裏調用到的
serverBootstrap.option(ChannelOption.SO_BACKLOG, 128)
複製代碼
public <T> B option(ChannelOption<T> option, T value) {
if (option == null) {
throw new NullPointerException("option");
}
if (value == null) {
synchronized (options) {
options.remove(option);
}
} else {
synchronized (options) {
options.put(option, value);
}
}
return (B) this;
}
複製代碼
這裏的 option 方法是父類 AbstractBootstrap 的方法,options 是一個有序的非線程安全的雙向鏈表,加鎖添加
public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) {
if (childOption == null) {
throw new NullPointerException("childOption");
}
if (value == null) {
synchronized (childOptions) {
childOptions.remove(childOption);
}
} else {
synchronized (childOptions) {
childOptions.put(childOption, value);
}
}
return this;
}
複製代碼
childOption 是子類 serverBootstrap 的方法
childOption 和 option 的區別:
option : 主要是設置 ServerChannel 的一些選項
childOption : 主要設置 ServerChannel 的子 channel 的選項,即 option
針對的是 boss 線程而 childOption 針對的是 work 線程池
serverBootstrap.handler(new InitHandler())
複製代碼
public B handler(ChannelHandler handler) {
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
return (B) this;
}
複製代碼
public ServerBootstrap childHandler(ChannelHandler childHandler) {
if (childHandler == null) {
throw new NullPointerException("childHandler");
}
this.childHandler = childHandler;
return this;
}
複製代碼
handler 和 childHandler 的區別
Handler 是屬於服務端 NioServerSocketChannel ,只會建立一次 childHandler 是屬於每個新建的 NioSocketChannel ,每當有一個鏈接上來,都會調用
serverBootstrap.bind(8802).sync()
複製代碼
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
複製代碼
public ChannelFuture bind(SocketAddress localAddress) {
validate();
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
return doBind(localAddress);
}
複製代碼
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
promise.executor = channel.eventLoop();
}
doBind0(regFuture, channel, localAddress, promise);
}
});
return promise;
}
}
複製代碼
final ChannelFuture initAndRegister() {
final Channel channel = channelFactory().newChannel();
init(channel);
ChannelFuture regFuture = group().register(channel);
return regFuture;
}
複製代碼
channelFactory 是 serverBootstrap.channel() 時建立的,在這裏調用反射建立 NioServerSocketChannel 實例
@Override
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options();
synchronized (options) {
channel.config().setOptions(options);
}
final Map<AttributeKey<?>, Object> attrs = attrs();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = handler();
if (handler != null) {
pipeline.addLast(handler);
}
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
複製代碼
options() 是 serverBootstrap.option() 賦值的 AbstractBootstrap 類的 options 雙向鏈表成員變量,在這裏將 options 和 attrs 注入 channel 中 P.addLast() 爲 NioServerSocketChannel 加入新的 handler (處理器),這裏 pipeline 相似於 Servlet 的過濾器,管理全部 handler
@Override
public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
if (channel == null) {
throw new NullPointerException("channel");
}
if (promise == null) {
throw new NullPointerException("promise");
}
channel.unsafe().register(this, promise);
return promise;
}
複製代碼
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new OneTimeTask() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
}
}
}
複製代碼
eventLoop.inEventLoop() 用來判斷啓動線程與當前線程是否相同,相同表示已經啓動,不一樣則有兩種可能:未啓動或者線程不一樣。
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
複製代碼
private void startThread() {
if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
thread.start();
}
}
}
複製代碼
咱們在最開始2.1裏面 SingleThreadEventExecutor 構造方法內的 thread 就是在這裏啓動的,咱們再回到2.1的
protected SingleThreadEventExecutor(
EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
thread = threadFactory.newThread(new Runnable() {
@Override
public void run() {
SingleThreadEventExecutor.this.run();
}
}
});
複製代碼
@Override
protected void run() {
for (;;) {
boolean oldWakenUp = wakenUp.getAndSet(false);
try {
if (hasTasks()) {
selectNow();
} else {
select(oldWakenUp);
if (wakenUp.get()) {
selector.wakeup();
}
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
processSelectedKeys();
runAllTasks();
} else {
final long ioStartTime = System.nanoTime();
processSelectedKeys();
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
break;
}
}
} catch (Throwable t) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
}
}
複製代碼
在這裏異步執行,輪詢 select 客戶端的 accept ,而且 runAllTasks 全部的任務
@Override
public ChannelFuture register(Channel channel) {
...
channel.unsafe().register(this, promise);
return promise;
}
複製代碼
如下是精簡後的代碼(位於 AbstractChannel 類的 AbstractUnsafe 內部類)
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
...
eventLoop.execute(new OneTimeTask() {
@Override
public void run() {
register0(promise);
}
});
...
}
複製代碼
private void register0(ChannelPromise promise) {
...
doRegister();
...
if (firstRegistration && isActive()) {
pipeline.fireChannelActive();
}
...
}
複製代碼
繼續(位於 AbstractNioChannel 類)
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
...
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
...
}
}
複製代碼
將 NioServerSocketChannel 註冊到 boss 線程池 NioEventLoop 的 Selector 上。
在這裏應該註冊 OP_ACCEPT(16) 到多路複用器上
註冊0的緣由:
(1)註冊方法是多態的,它既能夠被 NioServerSocketChannel 用來監聽客戶端的鏈接接入,也能夠註冊 SocketChannel 用來監聽網絡讀或寫操做
(2)經過 SelectionKey 的 interestOps(int ops) 方法能夠方便地修改監聽操做位
再看 pipeline.fireChannelActive()
@Override
public ChannelPipeline fireChannelActive() {
head.fireChannelActive();
if (channel.config().isAutoRead()) {
channel.read();
}
return this;
}
複製代碼
@Override
public Channel read() {
pipeline.read();
return this;
}
複製代碼
@Override
public ChannelPipeline read() {
tail.read();
return this;
}
複製代碼
@Override
public ChannelHandlerContext read() {
...
next.invokeRead();
...
}
複製代碼
private void invokeRead() {
try {
((ChannelOutboundHandler) handler()).read(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
複製代碼
進到 HeadContext 的 read
@Override
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}
複製代碼
@Override public final void beginRead() { ... doBeginRead(); ... }
@Override
protected void doBeginRead() throws Exception {
if (inputShutdown) {
return;
}
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
複製代碼
最終在這裏將 selectionKey 的監聽操做位改成 OP_READ
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
複製代碼
將方法丟到 reactor 線程池任務隊列中執行,會先判斷註冊是否成功,成功則繼續執行bind方法
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
複製代碼
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return tail.bind(localAddress, promise);
}
複製代碼
@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
...
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
...
next.invokeBind(localAddress, promise);
...
}
複製代碼
因爲 bind 事件是出站事件,尋找出站的 handler ,執行 invokeBind( ) 方法
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
複製代碼
@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
unsafe.bind(localAddress, promise);
}
複製代碼
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
...
doBind(localAddress);
...
}
複製代碼
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
複製代碼
通過多層 bind 深刻,最後在這裏能夠看到,仍是會調用Java底層的nio進行 socket bind 自此,服務端啓動流程解析完畢,咱們總結一下
① 經過 ServerBootstrap 輔助啓動類,配置了 reactor 線程池,服務端 Channel ,一些配置參數,客戶端鏈接後的 handler
② 將 ServerBootstrap 的值初始化,並註冊 OP_ACCEPT 到多路複用器
③ 啓動 reactor 線程池,不斷循環監聽鏈接,處理任務 ④ 綁定端口