簡介:java
Netty是一款高效的基於reactor線程模型、異步非阻塞事件驅動的網絡編程框架,主要用於網絡編程,適用於中間件開發組建、傳統應用通信開發。react
Reactor線程模型:線程模型線程模型中有兩種角色,一個是主處理線程(組),一個是工做線程(組),在netty中主處理線程和工做線程都由線程組來維護,主處理線程主要用於處理網絡事件中的Accept事件,工做線程主要用來處理主線程下發下來的網絡連接處理處理任務,如對channel中數據的讀寫等。編程
使用Netty開發網絡應用程序的優點:bootstrap
(1)採用java NIO原生開發網絡應用,成本相對較大,須要熟練掌握NIO編程中各組件的正確使用(如:多路複用器、channel、緩存機制等)、架構設計上線程模型的選擇及實現、編解碼技術、TCP半包處理等等。api
(2)Netty已解決或規避原生java NIO的各類問題或bug,比較醒目的就是多路複用的空輪詢。緩存
(3)Netty趨於成熟,已成爲不少大型底層中間件網絡通信組件(如:Hadoop的RPC框架avro)。網絡
(4)易用性,netty已儘量地對外屏蔽了底層網絡實現的具體細節,已相對友好的方式對外提供API(具體爲ServerBootstrap、Bootstrap、自實現編解碼抽象類、自定義Handler等)。架構
開發案例:框架
Client發出查詢時間的指令,Server端接收指令並簡單判斷後,將當前時間寫入channel。異步
maven pom.xml中引入:
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.0.Final</version> </dependency>
輔助類:
package com.best.diamond.netty.time; /** * Created by hengluwen on 17/10/1. */ public class NettyConstants { public static final String BAD_ORDER = "BAD ORDER"; public static final String QUERY_TIME_ORDER = "QUERY TIME ORDER"; }
Netty Server端:
package com.best.diamond.netty.time; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.util.CharsetUtil; /** * Created by hengluwen on 17/10/1. */ public class TimeServer { private static final StringDecoder DECODER = new StringDecoder(CharsetUtil.UTF_8); public void bind(int port) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_REUSEADDR, true) //服務端的ServerBootstrap中增長了一個方法childHandler,它的目的是添加handler(包括用戶自定義的handler),用來監聽已經鏈接的客戶端的Channel的動做和狀態。 .childHandler(new ChildChannelHandler()); //綁定端口,同步等待成功 ChannelFuture channelFuture = b.bind(port).sync(); //等待服務端監聽端口關閉 channelFuture.channel().closeFuture().sync(); } finally { //優雅退出,釋放線程池資源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } private class ChildChannelHandler extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); //如下兩行代碼爲了解決半包讀問題 pipeline.addLast("framer", new LineBasedFrameDecoder(1024)); pipeline.addLast("decoder", DECODER); pipeline.addLast(new TimeServerHandler()); } } public static void main(String[] args) throws Exception { new TimeServer().bind(8080); } }
服務端代碼中首先建立了兩個NioEventLoopGroup實例,NioEventLoopGroup是個線程組,它包含了一組NIO線程,專門用於網絡事件的處理,實際上它們就是Reactor線程組,建立兩個的緣由是:一個用於服務端接收客戶端的連接,一個用於進行SocketChannel的讀寫。設置建立Channel爲NioServerSocketChannel,它的功能對應於JDK NIO類庫中的ServerSocketChannel類,而後配置NioServerSocketChannel的TCP參數,此處將它的backlog設置爲1024(該參數表示未完成TCP三次握手的連接數與完成TCP三次握手的鏈接數的總和,這個參數會限制client的總連接數),ServerBootstrap是netty服務端啓動輔助類,目的爲下降服務端開發的複雜度。綁定I/O事件的處理類ChailChannelHandler,它的做用相似於Reactor模式中的Handler類,主要用於處理網絡I/O事件。
package com.best.diamond.netty.time; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import java.util.Date; /** * Created by hengluwen on 17/10/1. */ public class TimeServerHandler extends SimpleChannelInboundHandler<String> { @Override protected void channelRead0(ChannelHandlerContext ctx, String request) throws Exception { System.out.println("The time server receive order : " + request); String currentTime = NettyConstants.QUERY_TIME_ORDER.equalsIgnoreCase(request) ? new Date().toString() : NettyConstants.BAD_ORDER; sendMessage(ctx, currentTime); } private void sendMessage(ChannelHandlerContext ctx, String data) { ByteBuf byteBuf = Unpooled.copiedBuffer(data.getBytes()); ctx.writeAndFlush(byteBuf); } }
當接收到client發送的數據時,因爲在啓動時已經加入了String類型的消息解碼器(StringDecoder,此消息解碼器由Netty提供,不須要自定義),所以用戶自定義的Handler中接收到的request的數據類型爲String,沒必要再次進行手工解碼。
Netty Client端:
package com.best.diamond.netty.time; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import java.net.InetSocketAddress; /** * Created by hengluwen on 17/10/1. */ public class TimeClient { public void connect(String host, int port) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group).channel(NioServerSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new TimeClientHandler()); } }); ChannelFuture channelFuture = b.connect(new InetSocketAddress(host, port)).sync(); channelFuture.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); } } public static void main(String[] args) { try { new TimeClient().connect("127.0.0.1", 8080); } catch (Exception e) { e.printStackTrace(); } } }
client端的啓動代碼與server相似,因爲client更多的是要處理網絡事件的I/O,且正常狀況下client只會處理一個或兩個channel(channel失效重連時),所以只須要一個線程組便可。
package com.best.diamond.netty.time; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; /** * Created by hengluwen on 17/10/1. */ public class TimeClientHandler extends SimpleChannelInboundHandler<String> { private final ByteBuf firstMessage; public TimeClientHandler() { firstMessage = Unpooled.buffer().writeBytes(NettyConstants.QUERY_TIME_ORDER.getBytes()); } @Override public void channelActive(ChannelHandlerContext ctx) { ctx.writeAndFlush(firstMessage).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { System.out.println("error" + future.cause()); } } }); } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, String request) throws Exception { System.out.println("Now is : " + request); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
channel激活時,發出指定,等待服務端的相應。
總結:
(1)netty是一個高效的異步非阻塞的網絡通信框架,開發簡單。
(2)netty易用性強,提供了大量的編碼器/解碼器,已經處理TCP半包問題的輔助類。
(3)netty的api設計的強大而友好。