tomcat 異步線程模型大概能夠理解爲:acceptor負責接受新來的鏈接,而後把鏈接初始化後丟給poller來作io,而後又交給處理業務的exec線程池異步處理業務邏輯。java
因此若是IO線程和handler 在一個線程裏面,若是handler 執行某個邏輯比較耗時,好比查數據庫、服務間通訊等會嚴重影響整個netty的性能。這時候就須要考慮將耗時操做異步處理。數據庫
netty 中加入線程池有兩種方式:bootstrap
第一種是handler 中加入線程池promise
第二種是Context 中加入線程池tomcat
核心代碼以下:服務器
EchoServer異步
package cn.xm.netty.example.echo; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.SelfSignedCertificate; public final class EchoServer { static final boolean SSL = System.getProperty("ssl") != null; static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); public static void main(String[] args) throws Exception { final SslContext sslCtx; if (SSL) { SelfSignedCertificate ssc = new SelfSignedCertificate(); sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build(); } else { sslCtx = null; } EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); final EchoServerHandler serverHandler = new EchoServerHandler(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc())); } p.addLast(new EchoServerHandler2()); p.addLast(serverHandler); } }); ChannelFuture f = b.bind(PORT).sync(); f.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
EchoServerHandlersocket
package cn.xm.netty.example.echo; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.DefaultEventLoopGroup; import io.netty.util.CharsetUtil; public class EchoServerHandler extends ChannelInboundHandlerAdapter { private static final DefaultEventLoopGroup eventExecutors = new DefaultEventLoopGroup(16); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { System.out.println("cn.xm.netty.example.echo.EchoServerHandler.channelRead thread: " + Thread.currentThread().getName()); // 強轉爲netty的ByteBuffer(實際就是包裝的ByteBuffer) ByteBuf byteBuf = (ByteBuf) msg; System.out.println("客戶端發送的消息是:" + byteBuf.toString(CharsetUtil.UTF_8)); System.out.println("客戶端地址:" + ctx.channel().remoteAddress()); ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客戶端!0!", CharsetUtil.UTF_8)); // ctx.channel().eventLoop().execute(new Runnable() { eventExecutors.execute(new Runnable() { @Override public void run() { // 好比這裏咱們將一個特別耗時的任務轉爲異步執行(也就是任務提交到NioEventLoop的taskQueue中) System.out.println("java.lang.Runnable.run thread: " + Thread.currentThread().getName()); try { Thread.sleep(10 * 1000); } catch (InterruptedException e) { e.printStackTrace(); } ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客戶端!1!", CharsetUtil.UTF_8)); } }); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // Close the connection when an exception is raised. cause.printStackTrace(); ctx.close(); } }
EchoServerHandler2ide
package cn.xm.netty.example.echo; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; import io.netty.channel.DefaultEventLoopGroup; public class EchoServerHandler2 extends ChannelOutboundHandlerAdapter { private static final DefaultEventLoopGroup eventExecutors = new DefaultEventLoopGroup(16); @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { super.write(ctx, msg, promise); System.out.println("cn.xm.netty.example.echo.EchoServerHandler2.write called, threadName: " + Thread.currentThread().getName()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // Close the connection when an exception is raised. cause.printStackTrace(); ctx.close(); } }
EchoClientoop
package cn.xm.netty.example.echo; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; public final class EchoClient { static final boolean SSL = System.getProperty("ssl") != null; static final String HOST = System.getProperty("host", "127.0.0.1"); static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); public static void main(String[] args) throws Exception { final SslContext sslCtx; if (SSL) { sslCtx = SslContextBuilder.forClient() .trustManager(InsecureTrustManagerFactory.INSTANCE).build(); } else { sslCtx = null; } // Configure the client. EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT)); } p.addLast(new EchoClientHandler()); } }); // Start the client. ChannelFuture f = b.connect(HOST, PORT).sync(); // Wait until the connection is closed. f.channel().closeFuture().sync(); } finally { // Shut down the event loop to terminate all threads. group.shutdownGracefully(); } } }
EchoClientHandler
package cn.xm.netty.example.echo; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; public class EchoClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) { System.out.println("ClientHandler ctx: " + ctx); ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 服務器!", CharsetUtil.UTF_8)); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { // 強轉爲netty的ByteBuffer(實際就是包裝的ByteBuffer) ByteBuf byteBuf = (ByteBuf) msg; System.out.println("服務器會送的消息是:" + byteBuf.toString(CharsetUtil.UTF_8)); System.out.println("服務器地址:" + ctx.channel().remoteAddress()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
先啓動服務端,而後啓動客戶端,而後查看服務端控制檯以下:
cn.xm.netty.example.echo.EchoServerHandler.channelRead thread: nioEventLoopGroup-3-1 客戶端發送的消息是:hello, 服務器! 客戶端地址:/127.0.0.1:54247 cn.xm.netty.example.echo.EchoServerHandler2.write called, threadName: nioEventLoopGroup-3-1 java.lang.Runnable.run thread: defaultEventLoopGroup-4-1 cn.xm.netty.example.echo.EchoServerHandler2.write called, threadName: nioEventLoopGroup-3-1
能夠看到上面的邏輯是:
(1) 當IO線程輪詢到一個socket 事件後,IO線程開始處理,當走到EchoServerHandler 比較耗時的操做以後,將耗時任務交給線程池。
(2) 當耗時任務執行完畢再執行ctx.writeAndFlush 時,會將這個任務再交給IO線程,過程以下(也就是最終的寫操做都會交給IO線程):
1》io.netty.channel.AbstractChannelHandlerContext#write(java.lang.Object, boolean, io.netty.channel.ChannelPromise)
private void write(Object msg, boolean flush, ChannelPromise promise) { AbstractChannelHandlerContext next = findContextOutbound(); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { next.invokeWriteAndFlush(m, promise); } else { next.invokeWrite(m, promise); } } else { AbstractWriteTask task; if (flush) { task = WriteAndFlushTask.newInstance(next, m, promise); } else { task = WriteTask.newInstance(next, m, promise); } safeExecute(executor, task, promise, m); } }
這裏走的是else 代碼塊的代碼,由於 當前線程不屬於IO線程裏面, 因此就走else。 else 代碼塊的邏輯是建立一個寫Task, 而後調用io.netty.channel.AbstractChannelHandlerContext#safeExecute:
private static void safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) { try { executor.execute(runnable); } catch (Throwable cause) { try { promise.setFailure(cause); } finally { if (msg != null) { ReferenceCountUtil.release(msg); } } } }
能夠看到是調用execotor.execute 方法加入本身的任務隊列裏面。io.netty.util.concurrent.SingleThreadEventExecutor#execute
public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); } boolean inEventLoop = inEventLoop(); if (inEventLoop) { addTask(task); } else { startThread(); addTask(task); if (isShutdown() && removeTask(task)) { reject(); } } if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } }
補充:Handler 中加異步還有一種方式就是建立一個任務,加入到本身的任務隊列,這個實際也佔用的是IO線程
package cn.xm.netty.example.echo; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; public class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { System.out.println("cn.xm.netty.example.echo.EchoServerHandler.channelRead thread: " + Thread.currentThread().getName()); // 強轉爲netty的ByteBuffer(實際就是包裝的ByteBuffer) ByteBuf byteBuf = (ByteBuf) msg; System.out.println("客戶端發送的消息是:" + byteBuf.toString(CharsetUtil.UTF_8)); System.out.println("客戶端地址:" + ctx.channel().remoteAddress()); ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客戶端!0!", CharsetUtil.UTF_8)); ctx.channel().eventLoop().execute(new Runnable() { @Override public void run() { // 好比這裏咱們將一個特別耗時的任務轉爲異步執行(也就是任務提交到NioEventLoop的taskQueue中) System.out.println("java.lang.Runnable.run thread: " + Thread.currentThread().getName()); try { Thread.sleep(10 * 1000); } catch (InterruptedException e) { e.printStackTrace(); } ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客戶端!1!", CharsetUtil.UTF_8)); } }); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // Close the connection when an exception is raised. cause.printStackTrace(); ctx.close(); } }
測試: 能夠看出異步也用的是當前的IO線程
cn.xm.netty.example.echo.EchoServerHandler.channelRead thread: nioEventLoopGroup-3-1 客戶端發送的消息是:hello, 服務器! 客戶端地址:/127.0.0.1:53721 cn.xm.netty.example.echo.EchoServerHandler2.write called, threadName: nioEventLoopGroup-3-1 java.lang.Runnable.run thread: nioEventLoopGroup-3-1 cn.xm.netty.example.echo.EchoServerHandler2.write called, threadName: nioEventLoopGroup-3-1
EchoServer 代碼改造
package cn.xm.netty.example.echo; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.SelfSignedCertificate; public final class EchoServer { static final boolean SSL = System.getProperty("ssl") != null; static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); public static void main(String[] args) throws Exception { final SslContext sslCtx; if (SSL) { SelfSignedCertificate ssc = new SelfSignedCertificate(); sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build(); } else { sslCtx = null; } EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); DefaultEventLoopGroup group = new DefaultEventLoopGroup(16); final EchoServerHandler serverHandler = new EchoServerHandler(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc())); } p.addLast(group, new EchoServerHandler2()); p.addLast(group, serverHandler); } }); ChannelFuture f = b.bind(PORT).sync(); f.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
調用p.addLast 的時候指定使用的線程組。 若是不指定,默認使用的是IO線程組。 若是指定了就使用指定的線程組。 這樣就相似於Tomcat8 的線程模型。接收請求-》IO-》處理 分別在不一樣的線程裏面。
EchoServerHandler代碼改造: 正常處理,無需異步開線程
package cn.xm.netty.example.echo; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; public class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { System.out.println("cn.xm.netty.example.echo.EchoServerHandler.channelRead thread: " + Thread.currentThread().getName()); // 強轉爲netty的ByteBuffer(實際就是包裝的ByteBuffer) ByteBuf byteBuf = (ByteBuf) msg; System.out.println("客戶端發送的消息是:" + byteBuf.toString(CharsetUtil.UTF_8)); System.out.println("客戶端地址:" + ctx.channel().remoteAddress()); ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客戶端!0!", CharsetUtil.UTF_8)); // 好比這裏咱們將一個特別耗時的任務轉爲異步執行(也就是任務提交到NioEventLoop的taskQueue中) System.out.println("java.lang.Runnable.run thread: " + Thread.currentThread().getName()); try { Thread.sleep(10 * 1000); } catch (InterruptedException e) { e.printStackTrace(); } ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客戶端!1!", CharsetUtil.UTF_8)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // Close the connection when an exception is raised. cause.printStackTrace(); ctx.close(); } }
cn.xm.netty.example.echo.EchoServerHandler.channelRead thread: defaultEventLoopGroup-4-1 客戶端發送的消息是:hello, 服務器! 客戶端地址:/127.0.0.1:52966 cn.xm.netty.example.echo.EchoServerHandler2.write called, threadName: defaultEventLoopGroup-4-1 java.lang.Runnable.run thread: defaultEventLoopGroup-4-1 cn.xm.netty.example.echo.EchoServerHandler2.write called, threadName: defaultEventLoopGroup-4-1
能夠看到都是在本身開的線程組裏面完成的任務。
(1)從以前的源碼查閱到, context 封裝了handler、pipeline、executor 等信息。 在p.addLast 的時候咱們指定了本身的線程組,查看源碼
io.netty.channel.DefaultChannelPipeline#addLast(io.netty.util.concurrent.EventExecutorGroup, io.netty.channel.ChannelHandler...)
@Override public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) { if (handlers == null) { throw new NullPointerException("handlers"); } for (ChannelHandler h: handlers) { if (h == null) { break; } addLast(executor, null, h); } return this; } @Override public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this) { checkMultiplicity(handler); newCtx = newContext(group, filterName(name, handler), handler); addLast0(newCtx); // If the registered is false it means that the channel was not registered on an eventloop yet. // In this case we add the context to the pipeline and add a task that will call // ChannelHandler.handlerAdded(...) once the channel is registered. if (!registered) { newCtx.setAddPending(); callHandlerCallbackLater(newCtx, true); return this; } EventExecutor executor = newCtx.executor(); if (!executor.inEventLoop()) { newCtx.setAddPending(); executor.execute(new Runnable() { @Override public void run() { callHandlerAdded0(newCtx); } }); return this; } } callHandlerAdded0(newCtx); return this; }
io.netty.channel.DefaultChannelPipeline#newContext
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) { return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler); }
能夠看到使用了自定義的線程組。而且記錄到了DefaultChannelHandlerContext 屬性裏。
(2) 不指定線程組,默認使用的是null
io.netty.channel.DefaultChannelPipeline#addLast(io.netty.channel.ChannelHandler...)
public final ChannelPipeline addLast(ChannelHandler... handlers) { return addLast(null, handlers); }
(3) io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(io.netty.channel.AbstractChannelHandlerContext, java.lang.Object)
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); } }
咱們查看next屬性以下:
1》 io.netty.channel.AbstractChannelHandlerContext#executor 獲取executor 方法以下:
@Override public EventExecutor executor() { if (executor == null) { return channel().eventLoop(); } else { return executor; } }
能夠看到,若是指定了就返回指定的,未指定返回channel 的executor, 也就是IO線程。
2》接下來executor.inEventLoop() 爲false, 因此走else 代碼塊的異步邏輯。
總結:
第一種在handler中添加異步,比較靈活,能夠只將耗時的代碼塊加入異步。異步也會延長接口響應時間,由於須要先加入隊列。
第二種方式是netty的標準方式,至關於整個handler 都異步操做。不論耗時不耗時,都加入隊列異步進行處理。這樣理解清洗,可能不夠靈活。