前面學習了NIO與零拷貝、IO多路複用模型、Reactor主從模型。 服務器基於IO模型管理鏈接,獲取輸入數據,又基於線程模型,處理請求。 下面來學習Netty的具體應用。java
Netty線程模型是創建在Reactor主從模式的基礎上,主從 Rreactor 多線程模型: react
可是在Netty中,bossGroup至關於mainReactor,workerGroup至關於SubReactor與Worker線程池的合體。如:git
EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap server = new ServerBootstrap(); server.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class);
在啓動時,能夠初始化多個線程。bootstrap
EventLoopGroup bossGroup = new NioEventLoopGroup(2); EventLoopGroup workerGroup = new NioEventLoopGroup(3);
下面的例子演示了Netty的簡單使用。服務器
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.util.CharsetUtil; /** * EchoServerHandler */ // 標識這類的實例之間能夠在 channel 裏面共享 @ChannelHandler.Sharable public class EchoServerHandler extends ChannelHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf in = (ByteBuf) msg; System.out.println("Server received: " in.toString(CharsetUtil.UTF_8)); ctx.write(in); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.EMPTY_BUFFER) .addListener(ChannelFutureListener.CLOSE); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import java.net.InetSocketAddress; /** * Echo服務端 */ public class EchoServer { private final int port; private EchoServer(int port) { this.port = port; } private void start() throws Exception { //建立 EventLoopGroup NioEventLoopGroup boss = new NioEventLoopGroup(); NioEventLoopGroup work = new NioEventLoopGroup(); try { //建立 ServerBootstrap ServerBootstrap b = new ServerBootstrap(); b.group(boss, work) //指定使用 NIO 的傳輸 Channel .channel(NioServerSocketChannel.class) //設置 socket 地址使用所選的端口 .localAddress(new InetSocketAddress(port)) //添加 EchoServerHandler 到 Channel 的 ChannelPipeline .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { ch.pipeline().addLast(new EchoServerHandler()); } }); //綁定的服務器;sync 等待服務器關閉 ChannelFuture f = b.bind().sync(); System.out.println(EchoServer.class.getName() " started and listen on " f.channel().localAddress()); //關閉 channel 和 塊,直到它被關閉 f.channel().closeFuture().sync(); } finally { //關機的 EventLoopGroup,釋放全部資源。 group.shutdownGracefully().sync(); } } public static void main(String[] args) throws Exception { //設置端口值(拋出一個 NumberFormatException 若是該端口參數的格式不正確) int port = 9999; //服務器start() new EchoServer(port).start(); } }
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.CharsetUtil; public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> { @Override public void channelActive(ChannelHandlerContext ctx) { ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } @Override protected void messageReceived(ChannelHandlerContext ctx, ByteBuf msg) { System.out.println("Client received: " msg.toString(CharsetUtil.UTF_8)); } }
import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import java.net.InetSocketAddress; public class EchoClient { private final String host; private final int port; private EchoClient(String host, int port) { this.host = host; this.port = port; } private void start() throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { //建立 Bootstrap Bootstrap b = new Bootstrap(); //指定 EventLoopGroup 來處理客戶端事件。 //因爲使用 NIO 傳輸,因此用到了 NioEventLoopGroup 的實現 b.group(group) //使用的 channel 類型是一個用於 NIO 傳輸 .channel(NioSocketChannel.class) //設置服務器的 InetSocketAddress .remoteAddress(new InetSocketAddress(host, port)) //當創建一個鏈接和一個新的通道時,建立添加到 EchoClientHandler 實例 到 channel pipeline .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { ch.pipeline().addLast(new EchoClientHandler()); } }); //鏈接到遠程;等待鏈接完成 ChannelFuture f = b.connect().sync(); //阻塞直到 Channel 關閉 f.channel().closeFuture().sync(); } finally { //調用 shutdownGracefully() 來關閉線程池和釋放全部資源 group.shutdownGracefully().sync(); } } public static void main(String[] args) throws Exception { //服務器地址及端口 String host = "localhost"; int port = 9999; new EchoClient(host, port).start(); } }
服務端包含了1個boss NioEventLoopGroup和1個work NioEventLoopGroup。 NioEventLoopGroup至關於1個事件循環組,組內包含多個事件循環(NioEventLoop),每一個NioEventLoop包含1個Selector和1個事件循環線程。多線程
ctx.channel().eventLoop().execute(new Runnable() { @Override public void run() { //... } });
非當前 Reactor 線程調用 Channel 的各類方法 例如在推送系統的業務線程裏面,根據用戶的標識,找到對應的 Channel 引用,而後調用 Write 類方法向該用戶推送消息,就會進入到這種場景。最終的 Write 會提交到任務隊列中後被異步消費。架構
用戶自定義定時任務異步
ctx.channel().eventLoop().schedule(new Runnable() { @Override public void run() { //... } }, 60, TimeUnit.SECONDS);
這多是目前最透徹的Netty原理架構解析 Netty 實戰精髓篇 Netty入門教程 Essential Netty in Actionsocket