Netty是一個高性能、異步事件驅動的NIO框架,它提供了對TCP、UDP和文件傳輸的支持,做爲一個異步NIO框架,Netty的全部IO操做都是異步非阻塞的,經過Future-Listener機制,用戶能夠方便的主動獲取或者經過通知機制得到IO操做結果。html
做爲當前最流行的NIO框架,Netty在互聯網領域、大數據分佈式計算領域、遊戲行業、通訊行業等得到了普遍的應用,一些業界著名的開源組件也基於Netty的NIO框架構建。react
因此,Netty這個ZB技能必需要學會,不熟悉NIO的能夠先看看深刻淺出NIO Socket。api
Netty中的Reactor模型主要由多路複用器(Acceptor)、事件分發器(Dispatcher)、事件處理器(Handler)組成,能夠分爲三種。安全
一、單線程模型:全部I/O操做都由一個線程完成,即多路複用、事件分發和處理都是在一個Reactor線程上完成的。網絡
對於一些小容量應用場景,可使用單線程模型。可是對於高負載、大併發的應用卻不合適,主要緣由以下:多線程
二、多線程模型:爲了解決單線程模型存在的一些問題,演化而來的Reactor線程模型。架構
多線程模型的特色:併發
在絕大多數場景下,Reactor多線程模型均可以知足性能需求;可是,在極特殊應用場景中,一個NIO線程負責監聽和處理全部的客戶端鏈接可能會存在性能問題。例如百萬客戶端併發鏈接,或者服務端須要對客戶端的握手消息進行安全認證,認證自己很是損耗性能。在這類場景下,單獨一個Acceptor線程可能會存在性能不足問題,爲了解決性能問題,產生了第三種Reactor線程模型-主從Reactor多線程模型。框架
三、主從多線程模型:採用多個reactor,每一個reactor都在本身單獨的線程裏執行。若是是多核,則能夠同時響應多個客戶端的請求,一旦鏈路創建成功就將鏈路註冊到負責I/O讀寫的SubReactor線程池上。異步
事實上,Netty的線程模型並不是固定不變,在啓動輔助類中建立不一樣的EventLoopGroup實例並經過適當的參數配置,就能夠支持上述三種Reactor線程模型。正是由於Netty對Reactor線程模型的支持提供了靈活的定製能力,因此能夠知足不一樣業務場景的性能需求。
如下是server和client的示例代碼,其中使用的是 Netty 4.x,先看看如何實現,後續會針對各個模塊進行深刻分析。
server 代碼實現
public class EchoServer { private final int port; public EchoServer(int port) { this.port = port; } public void run() throws Exception { // Configure the server. EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1) EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); // (2) b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) // (3) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { // (4) @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( //new LoggingHandler(LogLevel.INFO), new EchoServerHandler()); } }); // Start the server. ChannelFuture f = b.bind(port).sync(); // (5) // Wait until the server socket is closed. f.channel().closeFuture().sync(); } finally { // Shut down all event loops to terminate all threads. bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port; if (args.length > 0) { port = Integer.parseInt(args[0]); } else { port = 8080; } new EchoServer(port).run(); } }
EchoServerHandler 實現
public class EchoServerHandler extends ChannelInboundHandlerAdapter { private static final Logger logger = Logger.getLogger( EchoServerHandler.class.getName()); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ctx.write(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // Close the connection when an exception is raised. logger.log(Level.WARNING, "Unexpected exception from downstream.", cause); ctx.close(); } }
一、NioEventLoopGroup 是用來處理I/O操做的線程池,Netty對 EventLoopGroup 接口針對不一樣的傳輸協議提供了不一樣的實現。在本例子中,須要實例化兩個NioEventLoopGroup,一般第一個稱爲「boss」,用來accept客戶端鏈接,另外一個稱爲「worker」,處理客戶端數據的讀寫操做。
二、ServerBootstrap 是啓動服務的輔助類,有關socket的參數能夠經過ServerBootstrap進行設置。
三、這裏指定NioServerSocketChannel類初始化channel用來接受客戶端請求。
四、一般會爲新SocketChannel經過添加一些handler,來設置ChannelPipeline。ChannelInitializer 是一個特殊的handler,其中initChannel方法能夠爲SocketChannel 的pipeline添加指定handler。
五、經過綁定端口8080,就能夠對外提供服務了。
client 代碼實現
public class EchoClient { private final String host; private final int port; private final int firstMessageSize; public EchoClient(String host, int port, int firstMessageSize) { this.host = host; this.port = port; this.firstMessageSize = firstMessageSize; } public void run() throws Exception { // Configure the client. EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( //new LoggingHandler(LogLevel.INFO), new EchoClientHandler(firstMessageSize)); } }); // Start the client. ChannelFuture f = b.connect(host, port).sync(); // Wait until the connection is closed. f.channel().closeFuture().sync(); } finally { // Shut down the event loop to terminate all threads. group.shutdownGracefully(); } } public static void main(String[] args) throws Exception { final String host = args[0]; final int port = Integer.parseInt(args[1]); final int firstMessageSize; if (args.length == 3) { firstMessageSize = Integer.parseInt(args[2]); } else { firstMessageSize = 256; } new EchoClient(host, port, firstMessageSize).run(); } }
EchoClientHandler 實現
public class EchoClientHandler extends ChannelInboundHandlerAdapter { private static final Logger logger = Logger.getLogger( EchoClientHandler.class.getName()); private final ByteBuf firstMessage; /** * Creates a client-side handler. */ public EchoClientHandler(int firstMessageSize) { if (firstMessageSize <= 0) { throw new IllegalArgumentException("firstMessageSize: " + firstMessageSize); } firstMessage = Unpooled.buffer(firstMessageSize); for (int i = 0; i < firstMessage.capacity(); i ++) { firstMessage.writeByte((byte) i); } } @Override public void channelActive(ChannelHandlerContext ctx) { ctx.writeAndFlush(firstMessage); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ctx.write(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // Close the connection when an exception is raised. logger.log(Level.WARNING, "Unexpected exception from downstream.", cause); ctx.close(); } }